diff options
Diffstat (limited to 'policy-endpoints/src/main/java')
3 files changed, 23 insertions, 9 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 690a6d0b..746798fa 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -322,7 +322,7 @@ public interface BusConsumer { logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(), response.getResponseMessage()); - if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) { + if (!"200".equals(response.getResponseCode())) { logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), response.getResponseMessage()); @@ -443,8 +443,9 @@ public interface BusConsumer { super(busTopicParams); - final String dme2RouteOffer = busTopicParams.getAdditionalProps() - .get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid() + ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY) + : null); if (busTopicParams.isEnvironmentInvalid()) { throw parmException(busTopicParams.getTopic(), diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 716ce95b..3365b4ec 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -204,6 +204,8 @@ public interface BusPublisher { this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); + } else { + throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol); } this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); @@ -361,12 +363,14 @@ public interface BusPublisher { props.setProperty("TransportType", "DME2"); props.setProperty("MethodType", "POST"); - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); + if (busTopicParams.isAdditionalPropsValid()) { + for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); - if (value != null) { - props.setProperty(key, value); + if (value != null) { + props.setProperty(key, value); + } } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index b9463e81..b588d1f3 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -168,7 +168,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase try { this.init(); this.alive = true; - this.busPollerThread = new Thread(this); + this.busPollerThread = makePollerThread(); this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic()); busPollerThread.start(); } catch (Exception e) { @@ -181,6 +181,15 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase return this.alive; } + /** + * Makes a new thread to be used for polling. + * + * @return a new Thread + */ + protected Thread makePollerThread() { + return new Thread(this); + } + @Override public boolean stop() { logger.info("{}: stopping", this); |