aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java498
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java343
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/FilterableBusConsumer.java35
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java10
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java4
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java1
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java16
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java3
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaConsumerWrapper.java210
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaPublisherWrapper.java127
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafConsumerWrapper.java95
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafPublisherWrapper.java39
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapConsumerWrapper.java146
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmeConsumerWrapper.java144
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmePublisherWrapper.java112
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapPublisherWrapper.java164
16 files changed, 855 insertions, 1092 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 8cc631c8..636dc6e3 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -20,7 +20,26 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
+import com.att.nsa.cambria.client.CambriaConsumer;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.impl.MRConsumerImpl;
+import com.att.nsa.mr.client.response.MRConsumerResponse;
+import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
+
import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Wrapper around libraries to consume from message bus
@@ -40,6 +59,485 @@ public interface BusConsumer {
* close underlying library consumer
*/
public void close();
+
+ /**
+ * BusConsumer that supports server-side filtering.
+ */
+ public interface FilterableBusConsumer extends BusConsumer {
+
+ /**
+ * Sets the server-side filter.
+ *
+ * @param filter new filter value, or {@code null}
+ * @throws IllegalArgumentException if the consumer cannot be built with the new filter
+ */
+ public void setFilter(String filter);
+ }
+
+ /**
+ * Cambria based consumer
+ */
+ public static class CambriaConsumerWrapper implements FilterableBusConsumer {
+
+ /**
+ * logger
+ */
+ private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
+
+ /**
+ * Used to build the consumer.
+ */
+ private final ConsumerBuilder builder;
+
+ /**
+ * Locked while updating {@link #consumer} and {@link #newConsumer}.
+ */
+ private final Object consLocker = new Object();
+
+ /**
+ * Cambria client
+ */
+ private CambriaConsumer consumer;
+
+ /**
+ * Cambria client to use for next fetch
+ */
+ private CambriaConsumer newConsumer = null;
+
+ /**
+ * fetch timeout
+ */
+ protected int fetchTimeout;
+
+ /**
+ * close condition
+ */
+ protected Object closeCondition = new Object();
+
+ /**
+ * Cambria Consumer Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ * @throws GeneralSecurityException
+ * @throws MalformedURLException
+ */
+ public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
+ String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps,
+ boolean useSelfSignedCerts) {
+ this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout,
+ fetchLimit, useHttps, useSelfSignedCerts);
+ }
+
+ public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
+ String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
+ int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
+
+ this.fetchTimeout = fetchTimeout;
+
+ this.builder = new CambriaClientBuilders.ConsumerBuilder();
+
+ builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
+ .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
+
+ // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
+ builder.withSocketTimeout(fetchTimeout + 30000);
+
+ if (useHttps) {
+ builder.usingHttps();
+
+ if (useSelfSignedCerts) {
+ builder.allowSelfSignedCertificates();
+ }
+ }
+
+ if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
+ builder.authenticatedBy(apiKey, apiSecret);
+ }
+
+ if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
+ builder.authenticatedByHttp(username, password);
+ }
+
+ try {
+ this.consumer = builder.build();
+ } catch (MalformedURLException | GeneralSecurityException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public Iterable<String> fetch() throws IOException, InterruptedException {
+ try {
+ return getCurrentConsumer().fetch();
+ } catch (final IOException e) {
+ logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
+ this.fetchTimeout);
+ synchronized (this.closeCondition) {
+ this.closeCondition.wait(this.fetchTimeout);
+ }
+
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized (closeCondition) {
+ closeCondition.notifyAll();
+ }
+
+ getCurrentConsumer().close();
+ }
+
+ private CambriaConsumer getCurrentConsumer() {
+ CambriaConsumer old = null;
+ CambriaConsumer ret;
+
+ synchronized (consLocker) {
+ if (this.newConsumer != null) {
+ // replace old consumer with new consumer
+ old = this.consumer;
+ this.consumer = this.newConsumer;
+ this.newConsumer = null;
+ }
+
+ ret = this.consumer;
+ }
+
+ if (old != null) {
+ old.close();
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void setFilter(String filter) {
+ logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
+ builder.withServerSideFilter(filter);
+
+ try {
+ CambriaConsumer previous;
+ synchronized (consLocker) {
+ previous = this.newConsumer;
+ this.newConsumer = builder.build();
+ }
+
+ if (previous != null) {
+ // there was already a new consumer - close it
+ previous.close();
+ }
+
+ } catch (MalformedURLException | GeneralSecurityException e) {
+ /*
+ * Since an exception occurred, "consumer" still has its old value, thus it should
+ * not be closed at this point.
+ */
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
+ }
+ }
+
+ /**
+ * MR based consumer
+ */
+ public abstract class DmaapConsumerWrapper implements BusConsumer {
+
+ /**
+ * logger
+ */
+ private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
+
+ /**
+ * Name of the "protocol" property.
+ */
+ protected static final String PROTOCOL_PROP = "Protocol";
+
+ /**
+ * fetch timeout
+ */
+ protected int fetchTimeout;
+
+ /**
+ * close condition
+ */
+ protected Object closeCondition = new Object();
+
+ /**
+ * MR Consumer
+ */
+ protected MRConsumerImpl consumer;
+
+ /**
+ * MR Consumer Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param username AAF Login
+ * @param password AAF Password
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ * @throws MalformedURLException
+ */
+ public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
+ String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
+ int fetchLimit) throws MalformedURLException {
+
+ this.fetchTimeout = fetchTimeout;
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("No topic for DMaaP");
+ }
+
+ this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout,
+ fetchLimit, null, apiKey, apiSecret);
+
+ this.consumer.setUsername(username);
+ this.consumer.setPassword(password);
+ }
+
+ @Override
+ public Iterable<String> fetch() throws InterruptedException, IOException {
+ final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
+ if (response == null) {
+ logger.warn("{}: DMaaP NULL response received", this);
+
+ synchronized (closeCondition) {
+ closeCondition.wait(fetchTimeout);
+ }
+ return new ArrayList<>();
+ } else {
+ logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(),
+ response.getResponseMessage());
+
+ if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) {
+
+ logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
+ response.getResponseMessage());
+
+ synchronized (closeCondition) {
+ closeCondition.wait(fetchTimeout);
+ }
+
+ /* fall through */
+ }
+ }
+
+ if (response.getActualMessages() == null) {
+ return new ArrayList<>();
+ } else {
+ return response.getActualMessages();
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized (closeCondition) {
+ closeCondition.notifyAll();
+ }
+
+ this.consumer.close();
+ }
+
+ @Override
+ public String toString() {
+ return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
+ + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
+ + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
+ + consumer.getUsername() + "]";
+ }
+ }
+
+ /**
+ * MR based consumer
+ */
+ public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
+
+ private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
+
+ private final Properties props;
+
+ /**
+ * MR Consumer Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param aafLogin AAF Login
+ * @param aafPassword AAF Password
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ * @throws MalformedURLException
+ */
+ public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
+ String aafLogin, String aafPassword, String consumerGroup, String consumerInstance, int fetchTimeout,
+ int fetchLimit, boolean useHttps) throws MalformedURLException {
+
+ super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, consumerInstance,
+ fetchTimeout, fetchLimit);
+
+ // super constructor sets servers = {""} if empty to avoid errors when using DME2
+ if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) || (servers.isEmpty())) {
+ throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
+ }
+
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+
+ props = new Properties();
+
+ if (useHttps) {
+ props.setProperty(PROTOCOL_PROP, "https");
+ this.consumer.setHost(servers.get(0) + ":3905");
+
+ } else {
+ props.setProperty(PROTOCOL_PROP, "http");
+ this.consumer.setHost(servers.get(0) + ":3904");
+ }
+
+ this.consumer.setProps(props);
+ logger.info("{}: CREATION", this);
+ }
+
+ @Override
+ public String toString() {
+ final MRConsumerImpl consumer = this.consumer;
+
+ return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
+ + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
+ + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
+ + consumer.getUsername() + "]";
+ }
+ }
+
+ public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
+
+ private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
+
+ private final Properties props;
+
+ public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
+ String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout,
+ int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude,
+ String longitude, Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException {
+
+
+
+ super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance,
+ fetchTimeout, fetchLimit);
+
+
+ final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+
+ if (environment == null || environment.isEmpty()) {
+ throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+ }
+ if (aftEnvironment == null || aftEnvironment.isEmpty()) {
+ throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+ }
+ if (latitude == null || latitude.isEmpty()) {
+ throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+ }
+ if (longitude == null || longitude.isEmpty()) {
+ throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+ }
+
+ if ((dme2Partner == null || dme2Partner.isEmpty())
+ && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
+ throw new IllegalArgumentException(
+ "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
+ + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ }
+
+ final String serviceName = servers.get(0);
+
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
+
+ this.consumer.setUsername(dme2Login);
+ this.consumer.setPassword(dme2Password);
+
+ props = new Properties();
+
+ props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+
+ props.setProperty("username", dme2Login);
+ props.setProperty("password", dme2Password);
+
+ /* These are required, no defaults */
+ props.setProperty("topic", topic);
+
+ props.setProperty("Environment", environment);
+ props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+
+ if (dme2Partner != null) {
+ props.setProperty("Partner", dme2Partner);
+ }
+ if (dme2RouteOffer != null) {
+ props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+ }
+
+ props.setProperty("Latitude", latitude);
+ props.setProperty("Longitude", longitude);
+
+ /* These are optional, will default to these values if not set in additionalProps */
+ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
+ props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
+ props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
+ props.setProperty("Version", "1.0");
+ props.setProperty("SubContextPath", "/");
+ props.setProperty("sessionstickinessrequired", "no");
+
+ /* These should not change */
+ props.setProperty("TransportType", "DME2");
+ props.setProperty("MethodType", "GET");
+
+ if (useHttps) {
+ props.setProperty(PROTOCOL_PROP, "https");
+
+ } else {
+ props.setProperty(PROTOCOL_PROP, "http");
+ }
+
+ props.setProperty("contenttype", "application/json");
+
+ if (additionalProps != null) {
+ for (Map.Entry<String, String> entry : additionalProps.entrySet()) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ MRClientFactory.prop = props;
+ this.consumer.setProps(props);
+
+ logger.info("{}: CREATION", this);
+ }
+
+ private IllegalArgumentException parmException(String topic, String propnm) {
+ return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + topic + propnm + " property for DME2 in DMaaP");
+
+ }
+ }
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index f6c65ada..9db9131c 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -20,6 +20,28 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import com.att.nsa.mr.client.response.MRPublisherResponse;
+import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public interface BusPublisher {
/**
@@ -36,4 +58,325 @@ public interface BusPublisher {
* closes the publisher
*/
public void close();
+
+ /**
+ * Cambria based library publisher
+ */
+ public static class CambriaPublisherWrapper implements BusPublisher {
+
+ private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
+
+ /**
+ * The actual Cambria publisher
+ */
+ @JsonIgnore
+ protected volatile CambriaBatchingPublisher publisher;
+
+ public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
+ boolean useHttps) {
+ this(servers, topic, apiKey, apiSecret, null, null, useHttps, false);
+ }
+
+ public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
+ String username, String password, boolean useHttps, boolean selfSignedCerts) {
+
+ PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
+
+ builder.usingHosts(servers).onTopic(topic);
+
+ // Set read timeout to 30 seconds (TBD: this should be configurable)
+ builder.withSocketTimeout(30000);
+
+ if (useHttps) {
+ if (selfSignedCerts) {
+ builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
+ } else {
+ builder.withConnectionType(ConnectionType.HTTPS);
+ }
+ }
+
+
+ if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
+ builder.authenticatedBy(apiKey, apiSecret);
+ }
+
+ if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
+ builder.authenticatedByHttp(username, password);
+ }
+
+ try {
+ this.publisher = builder.build();
+ } catch (MalformedURLException | GeneralSecurityException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean send(String partitionId, String message) {
+ if (message == null) {
+ throw new IllegalArgumentException("No message provided");
+ }
+
+ try {
+ this.publisher.send(partitionId, message);
+ } catch (Exception e) {
+ logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ logger.info("{}: CLOSE", this);
+
+ try {
+ this.publisher.close();
+ } catch (Exception e) {
+ logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
+ }
+ }
+
+
+ @Override
+ public String toString() {
+ return "CambriaPublisherWrapper []";
+ }
+
+ }
+
+ /**
+ * DmaapClient library wrapper
+ */
+ public abstract class DmaapPublisherWrapper implements BusPublisher {
+
+ private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
+
+ /**
+ * MR based Publisher
+ */
+ protected MRSimplerBatchPublisher publisher;
+ protected Properties props;
+
+ /**
+ * MR Publisher Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param username AAF or DME2 Login
+ * @param password AAF or DME2 Password
+ */
+ public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic,
+ String username, String password, boolean useHttps) {
+
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("No topic for DMaaP");
+ }
+
+
+ if (protocol == ProtocolTypeConstants.AAF_AUTH) {
+ if (servers == null || servers.isEmpty()) {
+ throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
+ }
+
+ ArrayList<String> dmaapServers = new ArrayList<>();
+ if (useHttps) {
+ for (String server : servers) {
+ dmaapServers.add(server + ":3905");
+ }
+
+ } else {
+ for (String server : servers) {
+ dmaapServers.add(server + ":3904");
+ }
+ }
+
+
+ this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
+
+ this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+ } else if (protocol == ProtocolTypeConstants.DME2) {
+ ArrayList<String> dmaapServers = new ArrayList<>();
+ dmaapServers.add("0.0.0.0:3904");
+
+ this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
+
+ this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
+ }
+
+ this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
+
+ this.publisher.setUsername(username);
+ this.publisher.setPassword(password);
+
+ props = new Properties();
+
+ if (useHttps) {
+ props.setProperty("Protocol", "https");
+ } else {
+ props.setProperty("Protocol", "http");
+ }
+
+ props.setProperty("contenttype", "application/json");
+ props.setProperty("username", username);
+ props.setProperty("password", password);
+
+ props.setProperty("topic", topic);
+
+ this.publisher.setProps(props);
+
+ if (protocol == ProtocolTypeConstants.AAF_AUTH) {
+ this.publisher.setHost(servers.get(0));
+ }
+
+ logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ logger.info("{}: CLOSE", this);
+
+ try {
+ this.publisher.close(1, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean send(String partitionId, String message) {
+ if (message == null) {
+ throw new IllegalArgumentException("No message provided");
+ }
+
+ this.publisher.setPubResponse(new MRPublisherResponse());
+ this.publisher.send(partitionId, message);
+ MRPublisherResponse response = this.publisher.sendBatchWithResponse();
+ if (response != null) {
+ logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(),
+ response.getResponseMessage());
+ }
+
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
+ + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()="
+ + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag()
+ + ", publisher.getUsername()=" + publisher.getUsername() + "]";
+ }
+ }
+
+ /**
+ * DmaapClient library wrapper
+ */
+ public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
+ /**
+ * MR based Publisher
+ */
+ public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
+ boolean useHttps) {
+
+ super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
+ }
+ }
+
+ public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
+ public DmaapDmePublisherWrapper(List<String> servers, String topic, String username, String password,
+ String environment, String aftEnvironment, String dme2Partner, String latitude, String longitude,
+ Map<String, String> additionalProps, boolean useHttps) {
+
+ super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
+
+
+
+ String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+
+ if (environment == null || environment.isEmpty()) {
+ throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+ }
+ if (aftEnvironment == null || aftEnvironment.isEmpty()) {
+ throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+ }
+ if (latitude == null || latitude.isEmpty()) {
+ throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+ }
+ if (longitude == null || longitude.isEmpty()) {
+ throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+ }
+
+ if ((dme2Partner == null || dme2Partner.isEmpty())
+ && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
+ throw new IllegalArgumentException(
+ "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
+ + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ }
+
+ String serviceName = servers.get(0);
+
+ /* These are required, no defaults */
+ props.setProperty("Environment", environment);
+ props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+
+ props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+
+ if (dme2Partner != null) {
+ props.setProperty("Partner", dme2Partner);
+ }
+ if (dme2RouteOffer != null) {
+ props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+ }
+
+ props.setProperty("Latitude", latitude);
+ props.setProperty("Longitude", longitude);
+
+ // ServiceName also a default, found in additionalProps
+
+ /* These are optional, will default to these values if not set in optionalProps */
+ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
+ props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
+ props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
+ props.setProperty("Version", "1.0");
+ props.setProperty("SubContextPath", "/");
+ props.setProperty("sessionstickinessrequired", "no");
+
+ /* These should not change */
+ props.setProperty("TransportType", "DME2");
+ props.setProperty("MethodType", "POST");
+
+ for (Map.Entry<String, String> entry : additionalProps.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (value != null) {
+ props.setProperty(key, value);
+ }
+ }
+
+ this.publisher.setProps(props);
+ }
+
+ private IllegalArgumentException parmException(String topic, String propnm) {
+ return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + topic + propnm + " property for DME2 in DMaaP");
+
+ }
+ }
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/FilterableBusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/FilterableBusConsumer.java
deleted file mode 100644
index 76adf980..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/FilterableBusConsumer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-/**
- * BusConsumer that supports server-side filtering.
- */
-public interface FilterableBusConsumer extends BusConsumer {
-
- /**
- * Sets the server-side filter.
- *
- * @param filter new filter value, or {@code null}
- * @throws IllegalArgumentException if the consumer cannot be built with the new filter
- */
- public void setFilter(String filter);
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java
index c630a165..3ea7185e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java
@@ -25,8 +25,6 @@ import java.util.Map;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaPublisherWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.DmaapDmePublisherWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,11 +101,11 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
@Override
public void init() {
if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
- this.publisher = new CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
- this.userName, this.password, this.useHttps, this.allowSelfSignedCerts);
+ this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey,
+ this.apiSecret, this.userName, this.password, this.useHttps, this.allowSelfSignedCerts);
} else {
- this.publisher = new DmaapDmePublisherWrapper(this.servers, this.topic, this.userName, this.password,
- this.environment, this.aftEnvironment, this.partner, this.latitude, this.longitude,
+ this.publisher = new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, this.userName,
+ this.password, this.environment, this.aftEnvironment, this.partner, this.latitude, this.longitude,
this.additionalProps, this.useHttps);
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java
index 34dd4e74..fefe6493 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java
@@ -24,7 +24,6 @@ import java.util.List;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaPublisherWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,8 +62,7 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
@Override
public void init() {
- this.publisher =
- new CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
+ this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
null, null, this.useHttps, this.allowSelfSignedCerts);
logger.info("{}: UEB SINK created", this);
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index d59c63f5..97ebbd50 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -27,6 +27,7 @@ import java.util.UUID;
import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
import org.onap.policy.common.utils.network.NetworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
index de5fed6a..8ac41424 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
@@ -26,8 +26,6 @@ import java.util.Map;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaConsumerWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.DmaapDmeConsumerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -140,15 +138,15 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
@Override
public void init() throws MalformedURLException {
if (anyNullOrEmpty(this.userName, this.password)) {
- this.consumer = new CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
- this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.useHttps,
- this.allowSelfSignedCerts);
+ this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey,
+ this.apiSecret, this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit,
+ this.useHttps, this.allowSelfSignedCerts);
} else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
- this.consumer = new CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
- this.userName, this.password, this.consumerGroup, this.consumerInstance, this.fetchTimeout,
- this.fetchLimit, this.useHttps, this.allowSelfSignedCerts);
+ this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey,
+ this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
+ this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts);
} else {
- this.consumer = new DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey,
+ this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey,
this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
this.fetchTimeout, this.fetchLimit, this.environment, this.aftEnvironment, this.partner,
this.latitude, this.longitude, this.additionalProps, this.useHttps);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
index 00e45422..b7a20503 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
@@ -24,7 +24,6 @@ import java.util.List;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaConsumerWrapper;
/**
* This topic source implementation specializes in reading messages over an UEB Bus topic source and
@@ -66,7 +65,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i
*/
@Override
public void init() {
- this.consumer = new CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
+ this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.useHttps,
this.allowSelfSignedCerts);
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaConsumerWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaConsumerWrapper.java
deleted file mode 100644
index fedde284..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaConsumerWrapper.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
-
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.List;
-
-import org.onap.policy.common.endpoints.event.comm.bus.internal.FilterableBusConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Cambria based consumer
- */
-public class CambriaConsumerWrapper implements FilterableBusConsumer {
-
- /**
- * logger
- */
- private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
-
- /**
- * Used to build the consumer.
- */
- private final ConsumerBuilder builder;
-
- /**
- * Locked while updating {@link #consumer} and {@link #newConsumer}.
- */
- private final Object consLocker = new Object();
-
- /**
- * Cambria client
- */
- private CambriaConsumer consumer;
-
- /**
- * Cambria client to use for next fetch
- */
- private CambriaConsumer newConsumer = null;
-
- /**
- * fetch timeout
- */
- protected int fetchTimeout;
-
- /**
- * close condition
- */
- protected Object closeCondition = new Object();
-
- /**
- * Cambria Consumer Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @throws GeneralSecurityException
- * @throws MalformedURLException
- */
- public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps,
- boolean useSelfSignedCerts) {
- this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout, fetchLimit,
- useHttps, useSelfSignedCerts);
- }
-
- public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String username,
- String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
- boolean useHttps, boolean useSelfSignedCerts) {
-
- this.fetchTimeout = fetchTimeout;
-
- this.builder = new CambriaClientBuilders.ConsumerBuilder();
-
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic).waitAtServer(fetchTimeout)
- .receivingAtMost(fetchLimit);
-
- // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
- builder.withSocketTimeout(fetchTimeout + 30000);
-
- if (useHttps) {
- builder.usingHttps();
-
- if (useSelfSignedCerts) {
- builder.allowSelfSignedCertificates();
- }
- }
-
- if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
- builder.authenticatedBy(apiKey, apiSecret);
- }
-
- if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
- builder.authenticatedByHttp(username, password);
- }
-
- try {
- this.consumer = builder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public Iterable<String> fetch() throws IOException, InterruptedException {
- try {
- return getCurrentConsumer().fetch();
- } catch (final IOException e) {
- logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
- this.fetchTimeout);
- synchronized (this.closeCondition) {
- this.closeCondition.wait(this.fetchTimeout);
- }
-
- throw e;
- }
- }
-
- @Override
- public void close() {
- synchronized (closeCondition) {
- closeCondition.notifyAll();
- }
-
- getCurrentConsumer().close();
- }
-
- private CambriaConsumer getCurrentConsumer() {
- CambriaConsumer old = null;
- CambriaConsumer ret;
-
- synchronized (consLocker) {
- if (this.newConsumer != null) {
- // replace old consumer with new consumer
- old = this.consumer;
- this.consumer = this.newConsumer;
- this.newConsumer = null;
- }
-
- ret = this.consumer;
- }
-
- if (old != null) {
- old.close();
- }
-
- return ret;
- }
-
- @Override
- public void setFilter(String filter) {
- logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
- builder.withServerSideFilter(filter);
-
- try {
- CambriaConsumer previous;
- synchronized (consLocker) {
- previous = this.newConsumer;
- this.newConsumer = builder.build();
- }
-
- if (previous != null) {
- // there was already a new consumer - close it
- previous.close();
- }
-
- } catch (MalformedURLException | GeneralSecurityException e) {
- /*
- * Since an exception occurred, "consumer" still has its old value, thus it should not
- * be closed at this point.
- */
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public String toString() {
- return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaPublisherWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaPublisherWrapper.java
deleted file mode 100644
index 7902d4be..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaPublisherWrapper.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
-
-import com.att.nsa.apiClient.http.HttpClient.ConnectionType;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.List;
-
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Cambria based library publisher
- */
-public class CambriaPublisherWrapper implements BusPublisher {
-
- private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
-
- /**
- * The actual Cambria publisher
- */
- @JsonIgnore
- protected volatile CambriaBatchingPublisher publisher;
-
- public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- boolean useHttps) {
- this(servers, topic, apiKey, apiSecret, null, null, useHttps, false);
- }
-
- public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String username,
- String password, boolean useHttps, boolean selfSignedCerts) {
-
- PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
-
- builder.usingHosts(servers).onTopic(topic);
-
- // Set read timeout to 30 seconds (TBD: this should be configurable)
- builder.withSocketTimeout(30000);
-
- if (useHttps) {
- if (selfSignedCerts) {
- builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
- } else {
- builder.withConnectionType(ConnectionType.HTTPS);
- }
- }
-
-
- if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
- builder.authenticatedBy(apiKey, apiSecret);
- }
-
- if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
- builder.authenticatedByHttp(username, password);
- }
-
- try {
- this.publisher = builder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean send(String partitionId, String message) {
- if (message == null) {
- throw new IllegalArgumentException("No message provided");
- }
-
- try {
- this.publisher.send(partitionId, message);
- } catch (Exception e) {
- logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
- return false;
- }
- return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void close() {
- logger.info("{}: CLOSE", this);
-
- try {
- this.publisher.close();
- } catch (Exception e) {
- logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
- }
- }
-
-
- @Override
- public String toString() {
- return "CambriaPublisherWrapper []";
- }
-
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafConsumerWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafConsumerWrapper.java
deleted file mode 100644
index 29fbf1d5..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafConsumerWrapper.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
-
-import com.att.nsa.mr.client.impl.MRConsumerImpl;
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-
-import java.net.MalformedURLException;
-import java.util.List;
-import java.util.Properties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * MR based consumer
- */
-public class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
-
- private final Properties props;
-
- /**
- * MR Consumer Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param aafLogin AAF Login
- * @param aafPassword AAF Password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @throws MalformedURLException
- */
- public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String aafLogin,
- String aafPassword, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
- boolean useHttps) throws MalformedURLException {
-
- super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, consumerInstance, fetchTimeout,
- fetchLimit);
-
- // super constructor sets servers = {""} if empty to avoid errors when using DME2
- if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) || (servers.isEmpty())) {
- throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
- }
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
-
- props = new Properties();
-
- if (useHttps) {
- props.setProperty(PROTOCOL_PROP, "https");
- this.consumer.setHost(servers.get(0) + ":3905");
-
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- this.consumer.setHost(servers.get(0) + ":3904");
- }
-
- this.consumer.setProps(props);
- logger.info("{}: CREATION", this);
- }
-
- @Override
- public String toString() {
- final MRConsumerImpl consumer = this.consumer;
-
- return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafPublisherWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafPublisherWrapper.java
deleted file mode 100644
index 1308bb36..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafPublisherWrapper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
-
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-
-import java.util.List;
-
-/**
- * DmaapClient library wrapper
- */
-public class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
- /**
- * MR based Publisher
- */
- public DmaapAafPublisherWrapper(List<String> servers, String topic, String aafLogin, String aafPassword,
- boolean useHttps) {
-
- super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapConsumerWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapConsumerWrapper.java
deleted file mode 100644
index 0806c3d3..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapConsumerWrapper.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
-
-import com.att.nsa.mr.client.impl.MRConsumerImpl;
-import com.att.nsa.mr.client.response.MRConsumerResponse;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * MR based consumer
- */
-public abstract class DmaapConsumerWrapper implements BusConsumer {
-
- /**
- * logger
- */
- private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
-
- /**
- * Name of the "protocol" property.
- */
- protected static final String PROTOCOL_PROP = "Protocol";
-
- /**
- * fetch timeout
- */
- protected int fetchTimeout;
-
- /**
- * close condition
- */
- protected Object closeCondition = new Object();
-
- /**
- * MR Consumer
- */
- protected MRConsumerImpl consumer;
-
- /**
- * MR Consumer Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param username AAF Login
- * @param password AAF Password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @throws MalformedURLException
- */
- public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String username,
- String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit)
- throws MalformedURLException {
-
- this.fetchTimeout = fetchTimeout;
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException("No topic for DMaaP");
- }
-
- this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, fetchLimit,
- null, apiKey, apiSecret);
-
- this.consumer.setUsername(username);
- this.consumer.setPassword(password);
- }
-
- @Override
- public Iterable<String> fetch() throws InterruptedException, IOException {
- final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
- if (response == null) {
- logger.warn("{}: DMaaP NULL response received", this);
-
- synchronized (closeCondition) {
- closeCondition.wait(fetchTimeout);
- }
- return new ArrayList<>();
- } else {
- logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(), response.getResponseMessage());
-
- if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) {
-
- logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
- response.getResponseMessage());
-
- synchronized (closeCondition) {
- closeCondition.wait(fetchTimeout);
- }
-
- /* fall through */
- }
- }
-
- if (response.getActualMessages() == null) {
- return new ArrayList<>();
- } else {
- return response.getActualMessages();
- }
- }
-
- @Override
- public void close() {
- synchronized (closeCondition) {
- closeCondition.notifyAll();
- }
-
- this.consumer.close();
- }
-
- @Override
- public String toString() {
- return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
- + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
- + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
- + consumer.getUsername() + "]";
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmeConsumerWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmeConsumerWrapper.java
deleted file mode 100644
index fd998e2b..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmeConsumerWrapper.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
-
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-
-import java.net.MalformedURLException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
-
- private final Properties props;
-
- public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude,
- String longitude, Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException {
-
-
-
- super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance, fetchTimeout,
- fetchLimit);
-
-
- final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
-
- if (environment == null || environment.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (aftEnvironment == null || aftEnvironment.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (latitude == null || latitude.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (longitude == null || longitude.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
- }
-
- if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
- throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
- }
-
- final String serviceName = servers.get(0);
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
-
- this.consumer.setUsername(dme2Login);
- this.consumer.setPassword(dme2Password);
-
- props = new Properties();
-
- props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
-
- props.setProperty("username", dme2Login);
- props.setProperty("password", dme2Password);
-
- /* These are required, no defaults */
- props.setProperty("topic", topic);
-
- props.setProperty("Environment", environment);
- props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
-
- if (dme2Partner != null) {
- props.setProperty("Partner", dme2Partner);
- }
- if (dme2RouteOffer != null) {
- props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- }
-
- props.setProperty("Latitude", latitude);
- props.setProperty("Longitude", longitude);
-
- /* These are optional, will default to these values if not set in additionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
-
- /* These should not change */
- props.setProperty("TransportType", "DME2");
- props.setProperty("MethodType", "GET");
-
- if (useHttps) {
- props.setProperty(PROTOCOL_PROP, "https");
-
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- }
-
- props.setProperty("contenttype", "application/json");
-
- if (additionalProps != null) {
- for (Map.Entry<String, String> entry : additionalProps.entrySet()) {
- props.put(entry.getKey(), entry.getValue());
- }
- }
-
- MRClientFactory.prop = props;
- this.consumer.setProps(props);
-
- logger.info("{}: CREATION", this);
- }
-
- private IllegalArgumentException parmException(String topic, String propnm) {
- return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + propnm + " property for DME2 in DMaaP");
-
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmePublisherWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmePublisherWrapper.java
deleted file mode 100644
index e39954ed..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmePublisherWrapper.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
-
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-
-import java.util.List;
-import java.util.Map;
-
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-
-public class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
- public DmaapDmePublisherWrapper(List<String> servers, String topic, String username, String password,
- String environment, String aftEnvironment, String dme2Partner, String latitude, String longitude,
- Map<String, String> additionalProps, boolean useHttps) {
-
- super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
-
-
-
- String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
-
- if (environment == null || environment.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (aftEnvironment == null || aftEnvironment.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (latitude == null || latitude.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (longitude == null || longitude.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
- }
-
- if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
- throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
- }
-
- String serviceName = servers.get(0);
-
- /* These are required, no defaults */
- props.setProperty("Environment", environment);
- props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
-
- props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
-
- if (dme2Partner != null) {
- props.setProperty("Partner", dme2Partner);
- }
- if (dme2RouteOffer != null) {
- props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- }
-
- props.setProperty("Latitude", latitude);
- props.setProperty("Longitude", longitude);
-
- // ServiceName also a default, found in additionalProps
-
- /* These are optional, will default to these values if not set in optionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
-
- /* These should not change */
- props.setProperty("TransportType", "DME2");
- props.setProperty("MethodType", "POST");
-
- for (Map.Entry<String, String> entry : additionalProps.entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
-
- if (value != null) {
- props.setProperty(key, value);
- }
- }
-
- this.publisher.setProps(props);
- }
-
- private IllegalArgumentException parmException(String topic, String propnm) {
- return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + propnm + " property for DME2 in DMaaP");
-
- }
-}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapPublisherWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapPublisherWrapper.java
deleted file mode 100644
index 396580e9..00000000
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapPublisherWrapper.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
-
-import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
-import com.att.nsa.mr.client.response.MRPublisherResponse;
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * DmaapClient library wrapper
- */
-public abstract class DmaapPublisherWrapper implements BusPublisher {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
-
- /**
- * MR based Publisher
- */
- protected MRSimplerBatchPublisher publisher;
- protected Properties props;
-
- /**
- * MR Publisher Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param username AAF or DME2 Login
- * @param password AAF or DME2 Password
- */
- public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic, String username,
- String password, boolean useHttps) {
-
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException("No topic for DMaaP");
- }
-
-
- if (protocol == ProtocolTypeConstants.AAF_AUTH) {
- if (servers == null || servers.isEmpty()) {
- throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
- }
-
- ArrayList<String> dmaapServers = new ArrayList<>();
- if (useHttps) {
- for (String server : servers) {
- dmaapServers.add(server + ":3905");
- }
-
- } else {
- for (String server : servers) {
- dmaapServers.add(server + ":3904");
- }
- }
-
-
- this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
-
- this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
- } else if (protocol == ProtocolTypeConstants.DME2) {
- ArrayList<String> dmaapServers = new ArrayList<>();
- dmaapServers.add("0.0.0.0:3904");
-
- this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
-
- this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
- }
-
- this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
-
- this.publisher.setUsername(username);
- this.publisher.setPassword(password);
-
- props = new Properties();
-
- if (useHttps) {
- props.setProperty("Protocol", "https");
- } else {
- props.setProperty("Protocol", "http");
- }
-
- props.setProperty("contenttype", "application/json");
- props.setProperty("username", username);
- props.setProperty("password", password);
-
- props.setProperty("topic", topic);
-
- this.publisher.setProps(props);
-
- if (protocol == ProtocolTypeConstants.AAF_AUTH) {
- this.publisher.setHost(servers.get(0));
- }
-
- logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void close() {
- logger.info("{}: CLOSE", this);
-
- try {
- this.publisher.close(1, TimeUnit.SECONDS);
- } catch (Exception e) {
- logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean send(String partitionId, String message) {
- if (message == null) {
- throw new IllegalArgumentException("No message provided");
- }
-
- this.publisher.setPubResponse(new MRPublisherResponse());
- this.publisher.send(partitionId, message);
- MRPublisherResponse response = this.publisher.sendBatchWithResponse();
- if (response != null) {
- logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(), response.getResponseMessage());
- }
-
- return true;
- }
-
- @Override
- public String toString() {
- return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
- + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()=" + publisher.getHost()
- + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag() + ", publisher.getUsername()="
- + publisher.getUsername() + "]";
- }
-}