aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoradheli.tavares <adheli.tavares@est.tech>2023-12-14 10:05:49 +0000
committeradheli.tavares <adheli.tavares@est.tech>2023-12-18 17:36:43 +0000
commit7d3f5bfd2e4fefe02c7f2fcf59981bb33f026419 (patch)
tree3eae2bb6ce22f335c180791de3c3d36b1a984d95
parent0813b37bd1f7421f1a777fdaff0e05f9b035fa32 (diff)
Apply lower case to any topics to be compatible with Kafka.
Issue-ID: POLICY-4402 Change-Id: Iebaec5f52a1fa0feb881ccfcb5319bc8a951b496 Signed-off-by: adheli.tavares <adheli.tavares@est.tech>
-rw-r--r--integrity-audit/src/main/resources/META-INF/persistence.xml1
-rw-r--r--integrity-monitor/src/main/resources/META-INF/persistence.xml1
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java5
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java5
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java9
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java9
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java16
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java27
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java23
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java8
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java4
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java4
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java58
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java4
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java4
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java5
16 files changed, 101 insertions, 82 deletions
diff --git a/integrity-audit/src/main/resources/META-INF/persistence.xml b/integrity-audit/src/main/resources/META-INF/persistence.xml
index 3a7fdd7f..63fea7b1 100644
--- a/integrity-audit/src/main/resources/META-INF/persistence.xml
+++ b/integrity-audit/src/main/resources/META-INF/persistence.xml
@@ -37,6 +37,7 @@
<persistence-unit name="integrityAuditPU" transaction-type="RESOURCE_LOCAL">
<!-- For operational use -->
+ <provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
<class>org.onap.policy.common.ia.jpa.IntegrityAuditEntity</class>
<shared-cache-mode>NONE</shared-cache-mode>
<properties>
diff --git a/integrity-monitor/src/main/resources/META-INF/persistence.xml b/integrity-monitor/src/main/resources/META-INF/persistence.xml
index 0adaae96..beff1432 100644
--- a/integrity-monitor/src/main/resources/META-INF/persistence.xml
+++ b/integrity-monitor/src/main/resources/META-INF/persistence.xml
@@ -25,6 +25,7 @@
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd">
<persistence-unit name="schemaPU" transaction-type="RESOURCE_LOCAL">
<!-- Limited use for generating the DB and schema files for imtest DB - uses hibernate -->
+ <provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
<class>org.onap.policy.common.im.jpa.ImTestEntity</class>
<class>org.onap.policy.common.im.jpa.StateManagementEntity</class>
<class>org.onap.policy.common.im.jpa.ForwardProgressEntity</class>
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java
index f36dfa31..b67756e5 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,9 +33,9 @@ public interface TopicSink extends Topic {
*
* @return true if the send operation succeeded, false otherwise
* @throws IllegalArgumentException an invalid message has been provided
- * @throws IllegalStateException the entity is in an state that prevents
+ * @throws IllegalStateException the entity is in a state that prevents
* it from sending messages, for example, locked or stopped.
*/
- public boolean send(String message);
+ boolean send(String message);
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java
index e77beea1..5ca87732 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,12 +33,12 @@ public interface BusTopicSink extends ApiKeyEnabled, TopicSink {
*
* @param partitionKey the partition key
*/
- public void setPartitionKey(String partitionKey);
+ void setPartitionKey(String partitionKey);
/**
* Return the partition key in used by the system to publish messages.
*
* @return the partition key
*/
- public String getPartitionKey();
+ String getPartitionKey();
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java
index 23aaabd4..f913926e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -42,7 +42,7 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory {
/**
* Logger.
*/
- private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class);
+ private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class);
/**
* KAFKA Topic Name Index.
@@ -98,7 +98,7 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory {
List<KafkaTopicSink> newKafkaTopicSinks = new ArrayList<>();
synchronized (this) {
for (String topic : COMMA_SPACE_PAT.split(writeTopics)) {
- addTopic(newKafkaTopicSinks, topic, properties);
+ addTopic(newKafkaTopicSinks, topic.toLowerCase(), properties);
}
return newKafkaTopicSinks;
}
@@ -113,7 +113,8 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory {
String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic;
var props = new PropertyUtils(properties, topicPrefix,
- (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
+ (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic sink {} ",
+ this, name, value, topic));
String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
if (StringUtils.isBlank(servers)) {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
index 1d586e43..151d8f69 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
@@ -42,7 +42,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
/**
* Logger.
*/
- private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class);
+ private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class);
/**
* KAFKA Topic Name Index.
@@ -84,7 +84,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
List<KafkaTopicSource> newKafkaTopicSources = new ArrayList<>();
synchronized (this) {
for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
- addTopic(newKafkaTopicSources, topic, properties);
+ addTopic(newKafkaTopicSources, topic.toLowerCase(), properties);
}
}
return newKafkaTopicSources;
@@ -110,11 +110,12 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic;
var props = new PropertyUtils(properties, topicPrefix,
- (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
+ (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic source {} ",
+ this, name, value, topic));
String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
if (StringUtils.isBlank(servers)) {
- logger.error("{}: no KAFKA servers configured for sink {}", this, topic);
+ logger.error("{}: no KAFKA servers configured for source {}", this, topic);
return;
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java
index 8cb51df8..e5642daa 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,11 +28,11 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
public interface KafkaTopicSourceFactory {
/**
- * Creates an Kafka Topic Source based on properties files.
+ * Creates a Kafka Topic Source based on properties files.
*
* @param properties Properties containing initialization values
*
- * @return an Kafka Topic Source
+ * @return a Kafka Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
List<KafkaTopicSource> build(Properties properties);
@@ -41,7 +41,7 @@ public interface KafkaTopicSourceFactory {
* Instantiates a new Kafka Topic Source.
*
* @param busTopicParams parameters object
- * @return an Kafka Topic Source
+ * @return a Kafka Topic Source
*/
KafkaTopicSource build(BusTopicParams busTopicParams);
@@ -51,13 +51,13 @@ public interface KafkaTopicSourceFactory {
* @param servers list of servers
* @param topic topic name
*
- * @return an Kafka Topic Source
+ * @return a Kafka Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
KafkaTopicSource build(List<String> servers, String topic);
/**
- * Destroys an Kafka Topic Source based on a topic.
+ * Destroys a Kafka Topic Source based on a topic.
*
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
@@ -70,10 +70,10 @@ public interface KafkaTopicSourceFactory {
void destroy();
/**
- * Gets an Kafka Topic Source based on topic name.
+ * Gets a Kafka Topic Source based on topic name.
*
* @param topic the topic name
- * @return an Kafka Topic Source with topic name
+ * @return a Kafka Topic Source with topic name
* @throws IllegalArgumentException if an invalid topic is provided
* @throws IllegalStateException if the Kafka Topic Source is an incorrect state
*/
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
index f9537f52..d6fa21b6 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
@@ -4,7 +4,7 @@
* ================================================================================
* Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved.
* Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019, 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -168,6 +168,26 @@ public class BusTopicParams {
return additionalProps != null;
}
+ public void setEffectiveTopic(String effectiveTopic) {
+ this.effectiveTopic = topicToLowerCase(effectiveTopic);
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topicToLowerCase(topic);
+ }
+
+ public String getEffectiveTopic() {
+ return topicToLowerCase(effectiveTopic);
+ }
+
+ public String getTopic() {
+ return topicToLowerCase(topic);
+ }
+
+ private String topicToLowerCase(String topic) {
+ return (topic == null || topic.isEmpty()) ? topic : topic.toLowerCase();
+ }
+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public static class TopicParamsBuilder {
@@ -179,12 +199,12 @@ public class BusTopicParams {
}
public TopicParamsBuilder topic(String topic) {
- this.params.topic = topic;
+ this.params.setTopic(topic);
return this;
}
public TopicParamsBuilder effectiveTopic(String effectiveTopic) {
- this.params.effectiveTopic = effectiveTopic;
+ this.params.setEffectiveTopic(effectiveTopic);
return this;
}
@@ -306,7 +326,6 @@ public class BusTopicParams {
this.params.serializationProvider = serializationProvider;
return this;
}
-
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
index 27ed5e7a..02626d34 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
@@ -5,6 +5,7 @@
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
* Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -159,28 +160,6 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
}
@Override
- protected boolean anyNullOrEmpty(String... args) {
- for (String arg : args) {
- if (arg == null || arg.isEmpty()) {
- return true;
- }
- }
-
- return false;
- }
-
- @Override
- protected boolean allNullOrEmpty(String... args) {
- for (String arg : args) {
- if (!(arg == null || arg.isEmpty())) {
- return false;
- }
- }
-
- return true;
- }
-
- @Override
public String toString() {
return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher
+ "]";
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
index 6574d408..f605de92 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,12 +33,12 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop
/**
* Logger.
*/
- private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
+ private static final Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
- protected Map<String, String> additionalProps = null;
+ protected Map<String, String> additionalProps;
/**
- * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned
+ * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains the below
* attributes.
*
* <p>servers list of KAFKA servers available for publishing
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
index 2a651ee7..713b4fd1 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@ import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
/**
- * This topic source implementation specializes in reading messages over an Kafka Bus topic source and
+ * This topic source implementation specializes in reading messages over a Kafka Bus topic source and
* notifying its listeners.
*/
public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
index 3372e0aa..c63fbcc2 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
@@ -117,8 +117,8 @@ public abstract class TopicBase implements Topic {
}
this.servers = servers;
- this.topic = topic;
- this.effectiveTopic = effectiveTopicCopy;
+ this.topic = topic.toLowerCase();
+ this.effectiveTopic = effectiveTopicCopy.toLowerCase();
}
@Override
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
index 2a9f1446..b5d53909 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
@@ -3,6 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,11 +22,13 @@
package org.onap.policy.common.endpoints.event.comm.client;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
+import org.jetbrains.annotations.NotNull;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
@@ -55,9 +58,9 @@ public class BidirectionalTopicClient {
private final CommInfrastructure sourceTopicCommInfrastructure;
/**
- * Used when checking whether or not a message sent on the sink topic can be received
+ * Used when checking whether a message sent on the sink topic can be received
* on the source topic. When a matching message is received on the incoming topic,
- * {@code true} is placed on the queue. If {@link #stop()} is called or the waiting
+ * {@code true} is placed on the queue. If {@link #stopWaiting()} is called or the waiting
* thread is interrupted, then {@code false} is placed on the queue. Whenever a value
* is pulled from the queue, it is immediately placed back on the queue.
*/
@@ -72,8 +75,8 @@ public class BidirectionalTopicClient {
* @throws BidirectionalTopicClientException if either topic does not exist
*/
public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
- this.sinkTopic = sinkTopic;
- this.sourceTopic = sourceTopic;
+ this.sinkTopic = sinkTopic.toLowerCase();
+ this.sourceTopic = sourceTopic.toLowerCase();
// init sinkClient
List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
@@ -86,7 +89,7 @@ public class BidirectionalTopicClient {
this.sink = sinks.get(0);
// init source
- List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
+ List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Collections.singletonList(sourceTopic));
if (sources.isEmpty()) {
throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic);
} else if (sources.size() > 1) {
@@ -116,7 +119,7 @@ public class BidirectionalTopicClient {
}
/**
- * Determines whether or not the topic is ready (i.e., {@link #awaitReady(Object)} has
+ * Determines whether the topic is ready (i.e., {@link #awaitReady(Object, long)} has
* previously returned {@code true}).
*
* @return {@code true}, if the topic is ready to send and receive
@@ -129,7 +132,7 @@ public class BidirectionalTopicClient {
* Waits for the bidirectional topic to become "ready" by publishing a message on the
* sink topic and awaiting receipt of the message on the source topic. If the message
* is not received within a few seconds, then it tries again. This process is
- * continued until the message is received, {@link #stop()} is called, or this thread
+ * continued until the message is received, {@link #stopWaiting()} is called, or this thread
* is interrupted. Once this returns, subsequent calls will return immediately, always
* with the same value.
*
@@ -150,24 +153,7 @@ public class BidirectionalTopicClient {
final String messageText = coder.encode(message);
// class of message to be decoded
- @SuppressWarnings("unchecked")
- final Class<? extends T> clazz = (Class<? extends T>) message.getClass();
-
- // create a listener to detect when a matching message is received
- final TopicListener listener = (infra, topic, msg) -> {
- try {
- T incoming = decode(msg, clazz);
-
- if (message.equals(incoming)) {
- logger.info("topic {} is ready; found matching message {}", topic, incoming);
- checkerQueue.add(Boolean.TRUE);
- }
-
- } catch (CoderException e) {
- logger.warn("cannot decode message from topic {}", topic, e);
- decodeFailed();
- }
- };
+ final TopicListener listener = getTopicListener(message);
source.register(listener);
@@ -193,6 +179,28 @@ public class BidirectionalTopicClient {
return checkerQueue.peek();
}
+ @NotNull
+ private <T> TopicListener getTopicListener(T message) {
+ @SuppressWarnings("unchecked")
+ final Class<? extends T> clazz = (Class<? extends T>) message.getClass();
+
+ // create a listener to detect when a matching message is received
+ return (infra, topic, msg) -> {
+ try {
+ T incoming = decode(msg, clazz);
+
+ if (message.equals(incoming)) {
+ logger.info("topic {} is ready; found matching message {}", topic, incoming);
+ checkerQueue.add(Boolean.TRUE);
+ }
+
+ } catch (CoderException e) {
+ logger.warn("cannot decode message from topic {}", topic, e);
+ decodeFailed();
+ }
+ };
+ }
+
/**
* Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by
* adding {@code false} to the queue.
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java
index 3a5d727b..1037d3af 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java
@@ -3,6 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,10 +21,13 @@
package org.onap.policy.common.endpoints.event.comm.client;
+import java.io.Serial;
+
/**
* Exception thrown by BidirectionalTopicClient class.
*/
public class BidirectionalTopicClientException extends Exception {
+ @Serial
private static final long serialVersionUID = 1L;
public BidirectionalTopicClientException() {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
index 9f8b3c06..5f49ea34 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
@@ -56,9 +56,9 @@ public class TopicSinkClient {
* @throws TopicSinkClientException if the topic does not exist
*/
public TopicSinkClient(final String topic) throws TopicSinkClientException {
- final List<TopicSink> lst = getTopicSinks(topic);
+ final List<TopicSink> lst = getTopicSinks(topic.toLowerCase());
if (lst.isEmpty()) {
- throw new TopicSinkClientException("no sinks for topic: " + topic);
+ throw new TopicSinkClientException("no sinks for topic: " + topic.toLowerCase());
}
this.sink = lst.get(0);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java
index 608393b3..431d4f34 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java
@@ -3,7 +3,7 @@
* ONAP PAP
* ================================================================================
* Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019, 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,10 +21,13 @@
package org.onap.policy.common.endpoints.event.comm.client;
+import java.io.Serial;
+
/**
* Exception thrown by TopicSink client classes.
*/
public class TopicSinkClientException extends Exception {
+ @Serial
private static final long serialVersionUID = 1L;
public TopicSinkClientException() {