aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java152
1 files changed, 78 insertions, 74 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index 1c85fa97..67adf3b4 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -178,27 +178,50 @@ public interface BusPublisher {
}
+ configureProtocol(topic, protocol, servers, useHttps);
+
+ this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
+
+ this.publisher.setUsername(username);
+ this.publisher.setPassword(password);
+
+ props = new Properties();
+
+ props.setProperty("Protocol", (useHttps ? "https" : "http"));
+ props.setProperty("contenttype", "application/json");
+ props.setProperty("username", username);
+ props.setProperty("password", password);
+
+ props.setProperty("topic", topic);
+
+ this.publisher.setProps(props);
+
+ if (protocol == ProtocolTypeConstants.AAF_AUTH) {
+ this.publisher.setHost(servers.get(0));
+ }
+
+ logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
+ }
+
+ private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
+ boolean useHttps) {
+
if (protocol == ProtocolTypeConstants.AAF_AUTH) {
if (servers == null || servers.isEmpty()) {
throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
}
ArrayList<String> dmaapServers = new ArrayList<>();
- if (useHttps) {
- for (String server : servers) {
- dmaapServers.add(server + ":3905");
- }
-
- } else {
- for (String server : servers) {
- dmaapServers.add(server + ":3904");
- }
+ String port = useHttps ? ":3905" : ":3904";
+ for (String server : servers) {
+ dmaapServers.add(server + port);
}
this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+
} else if (protocol == ProtocolTypeConstants.DME2) {
ArrayList<String> dmaapServers = new ArrayList<>();
dmaapServers.add("0.0.0.0:3904");
@@ -206,36 +229,10 @@ public interface BusPublisher {
this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
- } else {
- throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
- }
-
- this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
-
- this.publisher.setUsername(username);
- this.publisher.setPassword(password);
-
- props = new Properties();
- if (useHttps) {
- props.setProperty("Protocol", "https");
} else {
- props.setProperty("Protocol", "http");
- }
-
- props.setProperty("contenttype", "application/json");
- props.setProperty("username", username);
- props.setProperty("password", password);
-
- props.setProperty("topic", topic);
-
- this.publisher.setProps(props);
-
- if (protocol == ProtocolTypeConstants.AAF_AUTH) {
- this.publisher.setHost(servers.get(0));
+ throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
}
-
- logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
}
@Override
@@ -300,38 +297,12 @@ public interface BusPublisher {
super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(),
busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps());
- String dme2RouteOffer = null;
- if (busTopicParams.isAdditionalPropsValid()) {
- dme2RouteOffer = busTopicParams.getAdditionalProps().get(
- DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
- }
- if (busTopicParams.isEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isAftEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isLatitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (busTopicParams.isLongitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
- }
+ String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
+ ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
+ : null;
- if ((busTopicParams.isPartnerInvalid())
- && StringUtils.isBlank(dme2RouteOffer)) {
- throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
- }
+ validateParams(busTopicParams, dme2RouteOffer);
String serviceName = busTopicParams.getServers().get(0);
@@ -366,19 +337,52 @@ public interface BusPublisher {
props.setProperty("MethodType", "POST");
if (busTopicParams.isAdditionalPropsValid()) {
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
-
- if (value != null) {
- props.setProperty(key, value);
- }
- }
+ addAdditionalProps(busTopicParams);
}
this.publisher.setProps(props);
}
+ private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
+ if (busTopicParams.isEnvironmentInvalid()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+ }
+ if (busTopicParams.isAftEnvironmentInvalid()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+ }
+ if (busTopicParams.isLatitudeInvalid()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+ }
+ if (busTopicParams.isLongitudeInvalid()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+ }
+
+ if ((busTopicParams.isPartnerInvalid())
+ && StringUtils.isBlank(dme2RouteOffer)) {
+ throw new IllegalArgumentException(
+ "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + busTopicParams.getTopic()
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
+ + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ }
+ }
+
+ private void addAdditionalProps(BusTopicParams busTopicParams) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (value != null) {
+ props.setProperty(key, value);
+ }
+ }
+ }
+
private IllegalArgumentException parmException(String topic, String propnm) {
return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ topic + propnm + " property for DME2 in DMaaP");