aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java7
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java14
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java11
3 files changed, 23 insertions, 9 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 690a6d0b..746798fa 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -322,7 +322,7 @@ public interface BusConsumer {
logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(),
response.getResponseMessage());
- if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) {
+ if (!"200".equals(response.getResponseCode())) {
logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
response.getResponseMessage());
@@ -443,8 +443,9 @@ public interface BusConsumer {
super(busTopicParams);
- final String dme2RouteOffer = busTopicParams.getAdditionalProps()
- .get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+ final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
+ ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
+ : null);
if (busTopicParams.isEnvironmentInvalid()) {
throw parmException(busTopicParams.getTopic(),
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index 716ce95b..3365b4ec 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -204,6 +204,8 @@ public interface BusPublisher {
this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
+ } else {
+ throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
}
this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
@@ -361,12 +363,14 @@ public interface BusPublisher {
props.setProperty("TransportType", "DME2");
props.setProperty("MethodType", "POST");
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
+ if (busTopicParams.isAdditionalPropsValid()) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
- if (value != null) {
- props.setProperty(key, value);
+ if (value != null) {
+ props.setProperty(key, value);
+ }
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index b9463e81..b588d1f3 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -168,7 +168,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
try {
this.init();
this.alive = true;
- this.busPollerThread = new Thread(this);
+ this.busPollerThread = makePollerThread();
this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
busPollerThread.start();
} catch (Exception e) {
@@ -181,6 +181,15 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
return this.alive;
}
+ /**
+ * Makes a new thread to be used for polling.
+ *
+ * @return a new Thread
+ */
+ protected Thread makePollerThread() {
+ return new Thread(this);
+ }
+
@Override
public boolean stop() {
logger.info("{}: stopping", this);