diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal')
2 files changed, 96 insertions, 87 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 1c85fa97..67adf3b4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -178,27 +178,50 @@ public interface BusPublisher { } + configureProtocol(topic, protocol, servers, useHttps); + + this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); + + this.publisher.setUsername(username); + this.publisher.setPassword(password); + + props = new Properties(); + + props.setProperty("Protocol", (useHttps ? "https" : "http")); + props.setProperty("contenttype", "application/json"); + props.setProperty("username", username); + props.setProperty("password", password); + + props.setProperty("topic", topic); + + this.publisher.setProps(props); + + if (protocol == ProtocolTypeConstants.AAF_AUTH) { + this.publisher.setHost(servers.get(0)); + } + + logger.info("{}: CREATION: using protocol {}", this, protocol.getValue()); + } + + private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers, + boolean useHttps) { + if (protocol == ProtocolTypeConstants.AAF_AUTH) { if (servers == null || servers.isEmpty()) { throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided"); } ArrayList<String> dmaapServers = new ArrayList<>(); - if (useHttps) { - for (String server : servers) { - dmaapServers.add(server + ":3905"); - } - - } else { - for (String server : servers) { - dmaapServers.add(server + ":3904"); - } + String port = useHttps ? ":3905" : ":3904"; + for (String server : servers) { + dmaapServers.add(server + port); } this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + } else if (protocol == ProtocolTypeConstants.DME2) { ArrayList<String> dmaapServers = new ArrayList<>(); dmaapServers.add("0.0.0.0:3904"); @@ -206,36 +229,10 @@ public interface BusPublisher { this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - } else { - throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol); - } - - this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); - - this.publisher.setUsername(username); - this.publisher.setPassword(password); - - props = new Properties(); - if (useHttps) { - props.setProperty("Protocol", "https"); } else { - props.setProperty("Protocol", "http"); - } - - props.setProperty("contenttype", "application/json"); - props.setProperty("username", username); - props.setProperty("password", password); - - props.setProperty("topic", topic); - - this.publisher.setProps(props); - - if (protocol == ProtocolTypeConstants.AAF_AUTH) { - this.publisher.setHost(servers.get(0)); + throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol); } - - logger.info("{}: CREATION: using protocol {}", this, protocol.getValue()); } @Override @@ -300,38 +297,12 @@ public interface BusPublisher { super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(), busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps()); - String dme2RouteOffer = null; - if (busTopicParams.isAdditionalPropsValid()) { - dme2RouteOffer = busTopicParams.getAdditionalProps().get( - DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - } - if (busTopicParams.isEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isAftEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isLatitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (busTopicParams.isLongitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } + String dme2RouteOffer = busTopicParams.isAdditionalPropsValid() + ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY) + : null; - if ((busTopicParams.isPartnerInvalid()) - && StringUtils.isBlank(dme2RouteOffer)) { - throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } + validateParams(busTopicParams, dme2RouteOffer); String serviceName = busTopicParams.getServers().get(0); @@ -366,19 +337,52 @@ public interface BusPublisher { props.setProperty("MethodType", "POST"); if (busTopicParams.isAdditionalPropsValid()) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - - if (value != null) { - props.setProperty(key, value); - } - } + addAdditionalProps(busTopicParams); } this.publisher.setProps(props); } + private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) { + if (busTopicParams.isEnvironmentInvalid()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + } + if (busTopicParams.isAftEnvironmentInvalid()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + } + if (busTopicParams.isLatitudeInvalid()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + } + if (busTopicParams.isLongitudeInvalid()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + } + + if ((busTopicParams.isPartnerInvalid()) + && StringUtils.isBlank(dme2RouteOffer)) { + throw new IllegalArgumentException( + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + busTopicParams.getTopic() + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic() + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + } + + private void addAdditionalProps(BusTopicParams busTopicParams) { + for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + + if (value != null) { + props.setProperty(key, value); + } + } + } + private IllegalArgumentException parmException(String topic, String propnm) { return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + propnm + " property for DME2 in DMaaP"); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 98e30e27..0953465b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -21,6 +21,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import java.io.IOException; import java.net.MalformedURLException; import java.util.UUID; @@ -223,19 +224,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase public void run() { while (this.alive) { try { - for (String event : this.consumer.fetch()) { - synchronized (this) { - this.recentEvents.add(event); - } - - NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event); - - broadcast(event); - - if (!this.alive) { - break; - } - } + fetchAllMessages(); } catch (Exception e) { logger.error("{}: cannot fetch because of ", this, e.getMessage(), e); } @@ -244,6 +233,22 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase logger.info("{}: exiting thread", this); } + private void fetchAllMessages() throws InterruptedException, IOException { + for (String event : this.consumer.fetch()) { + synchronized (this) { + this.recentEvents.add(event); + } + + NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event); + + broadcast(event); + + if (!this.alive) { + return; + } + } + } + @Override public boolean offer(String event) { if (!this.alive) { |