summaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java')
-rw-r--r--policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java229
1 files changed, 202 insertions, 27 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java
index 798bf989..b5595b2d 100644
--- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java
@@ -24,14 +24,20 @@ import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.openecomp.policy.common.logging.eelf.PolicyLogger;
+import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
+import org.openecomp.policy.drools.properties.PolicyProperties;
+import org.slf4j.LoggerFactory;
+
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import com.att.nsa.cambria.client.CambriaClientBuilders;
import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import com.att.nsa.mr.client.response.MRPublisherResponse;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -65,15 +71,21 @@ public interface BusPublisher {
public CambriaPublisherWrapper(List<String> servers, String topic,
String apiKey,
- String apiSecret)
- throws IllegalArgumentException {
+ String apiSecret, boolean useHttps) throws IllegalArgumentException {
PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
-
- builder.usingHosts(servers)
+
+
+ if (useHttps){
+
+ builder.usingHosts(servers)
+ .onTopic(topic)
+ .usingHttps();
+ }
+ else{
+ builder.usingHosts(servers)
.onTopic(topic);
-
- // Only supported in 0.2.4 version
- // .logSendFailuresAfter(DEFAULT_LOG_SEND_FAILURES_AFTER);
+ }
+
if (apiKey != null && !apiKey.isEmpty() &&
apiSecret != null && !apiSecret.isEmpty()) {
@@ -142,43 +154,105 @@ public interface BusPublisher {
/**
* DmaapClient library wrapper
*/
- public static class DmaapPublisherWrapper implements BusPublisher {
+ public abstract class DmaapPublisherWrapper implements BusPublisher {
/**
* MR based Publisher
*/
protected MRSimplerBatchPublisher publisher;
+ protected Properties props;
- public DmaapPublisherWrapper(List<String> servers, String topic,
- String aafLogin,
- String aafPassword) {
+ /**
+ * MR Publisher Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param username AAF or DME2 Login
+ * @param password AAF or DME2 Password
+ */
+ public DmaapPublisherWrapper(ProtocolTypeConstants protocol,
+ List<String> servers, String topic,
+ String username,
+ String password, boolean useHttps) throws IllegalArgumentException {
+
- ArrayList<String> dmaapServers = new ArrayList<String>();
- for (String server: servers) {
- dmaapServers.add(server + ":3904");
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("No topic for DMaaP");
}
-
- this.publisher =
- new MRSimplerBatchPublisher.Builder().
- againstUrls(dmaapServers).
- onTopic(topic).
- build();
- this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+ if (protocol == ProtocolTypeConstants.AAF_AUTH) {
+ if (servers == null || servers.isEmpty())
+ throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
+
+ ArrayList<String> dmaapServers = new ArrayList<String>();
+ if(useHttps){
+ for (String server: servers) {
+ dmaapServers.add(server + ":3905");
+ }
+
+ }
+ else{
+ for (String server: servers) {
+ dmaapServers.add(server + ":3904");
+ }
+ }
+
+
+ this.publisher =
+ new MRSimplerBatchPublisher.Builder().
+ againstUrls(dmaapServers).
+ onTopic(topic).
+ build();
+
+ this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+ } else if (protocol == ProtocolTypeConstants.DME2) {
+ ArrayList<String> dmaapServers = new ArrayList<String>();
+ dmaapServers.add("0.0.0.0:3904");
+
+ this.publisher =
+ new MRSimplerBatchPublisher.Builder().
+ againstUrls(dmaapServers).
+ onTopic(topic).
+ build();
+
+ this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
+
+ }
+
+ this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
- this.publisher.setUsername(aafLogin);
- this.publisher.setPassword(aafPassword);
+ this.publisher.setUsername(username);
+ this.publisher.setPassword(password);
- Properties props = new Properties();
- props.setProperty("Protocol", "http");
+ props = new Properties();
+
+ if(useHttps){
+
+ props.setProperty("Protocol", "https");
+ }
+ else{
+
+ props.setProperty("Protocol", "http");
+
+ }
+
props.setProperty("contenttype", "application/json");
+ props.setProperty("username", username);
+ props.setProperty("password", password);
+
+ props.setProperty("topic", topic);
this.publisher.setProps(props);
- this.publisher.setHost(servers.get(0));
+ if (protocol == ProtocolTypeConstants.AAF_AUTH)
+ this.publisher.setHost(servers.get(0));
- if (PolicyLogger.isInfoEnabled())
+ if (PolicyLogger.isInfoEnabled()) {
PolicyLogger.info(DmaapPublisherWrapper.class.getName(),
"CREATION: " + this);
+ PolicyLogger.info(DmaapPublisherWrapper.class.getName(),
+ "BusPublisher.DmaapPublisherWrapper using Protocol: " + protocol.getValue());
+ }
+
}
/**
@@ -208,7 +282,16 @@ public interface BusPublisher {
if (message == null)
throw new IllegalArgumentException("No message provided");
+ this.publisher.setPubResponse(new MRPublisherResponse());
this.publisher.send(partitionId, message);
+ MRPublisherResponse response = this.publisher.sendBatchWithResponse();
+ if (PolicyLogger.isDebugEnabled() && response != null) {
+ PolicyLogger.debug(DmaapPublisherWrapper.class.getName(),
+ "DMaaP publisher received " + response.getResponseCode() + ": "
+ + response.getResponseMessage());
+
+ }
+
return true;
}
@@ -227,5 +310,97 @@ public interface BusPublisher {
return builder.toString();
}
}
+
+ /**
+ * DmaapClient library wrapper
+ */
+ public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
+ /**
+ * MR based Publisher
+ */
+ protected MRSimplerBatchPublisher publisher;
+
+ public DmaapAafPublisherWrapper(List<String> servers, String topic,
+ String aafLogin,
+ String aafPassword, boolean useHttps) {
+
+ super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
+ }
+ }
+
+ public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
+ public DmaapDmePublisherWrapper(List<String> servers, String topic,
+ String username, String password,
+ String environment, String aftEnvironment, String dme2Partner,
+ String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) {
+
+ super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
+
+
+
+
+
+
+ String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+
+ if (environment == null || environment.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
+ } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
+ } if (latitude == null || latitude.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
+ } if (longitude == null || longitude.isEmpty()) {
+ throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
+ }
+
+ if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
+ throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
+ "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " +
+ PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ }
+
+ String serviceName = servers.get(0);
+
+ /* These are required, no defaults */
+ props.setProperty("Environment", environment);
+ props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+
+ props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+ if (dme2Partner != null)
+ props.setProperty("Partner", dme2Partner);
+ if (dme2RouteOffer != null)
+ props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+
+ props.setProperty("Latitude", latitude);
+ props.setProperty("Longitude", longitude);
+
+ // ServiceName also a default, found in additionalProps
+
+ /* These are optional, will default to these values if not set in optionalProps */
+ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
+ props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
+ props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
+ props.setProperty("Version", "1.0");
+ props.setProperty("SubContextPath", "/");
+ props.setProperty("sessionstickinessrequired", "no");
+
+ /* These should not change */
+ props.setProperty("TransportType", "DME2");
+ props.setProperty("MethodType", "POST");
+
+ for (String key : additionalProps.keySet()) {
+ String value = additionalProps.get(key);
+
+ if (value != null)
+ props.setProperty(key, value);
+ }
+
+ this.publisher.setProps(props);
+ }
+ }
}