From 9f8be335925299059992bd46285a1b633d518051 Mon Sep 17 00:00:00 2001 From: mmis Date: Mon, 30 Jul 2018 17:04:50 +0100 Subject: Copy policy-endpoints from drools-pdp to common Removed changes made in commit b40acf2d244058c162a8597968e59f2708e6abf4 that went beyond the scope of POLICY-967 Issue-ID: POLICY-967 Change-Id: Ibbf78540dec8bf8601a62dacc8c7056d43f70ba1 Signed-off-by: mmis --- .../common/endpoints/event/comm/TopicEndpoint.java | 438 ++++++++++++++++++ .../endpoints/event/comm/bus/DmaapTopicSink.java | 6 + .../event/comm/bus/DmaapTopicSinkFactory.java | 289 ++++++++++++ .../endpoints/event/comm/bus/DmaapTopicSource.java | 5 + .../event/comm/bus/DmaapTopicSourceFactory.java | 365 +++++++++++++++ .../endpoints/event/comm/bus/NoopTopicSink.java | 5 + .../event/comm/bus/NoopTopicSinkFactory.java | 149 ++++++ .../endpoints/event/comm/bus/UebTopicSink.java | 4 + .../event/comm/bus/UebTopicSinkFactory.java | 193 ++++++++ .../endpoints/event/comm/bus/UebTopicSource.java | 4 + .../event/comm/bus/UebTopicSourceFactory.java | 245 ++++++++++ .../bus/impl/IndexedDmaapTopicSinkFactory.java | 325 -------------- .../bus/impl/IndexedDmaapTopicSourceFactory.java | 401 ----------------- .../comm/bus/impl/IndexedNoopTopicSinkFactory.java | 185 -------- .../comm/bus/impl/IndexedUebTopicSinkFactory.java | 229 ---------- .../bus/impl/IndexedUebTopicSourceFactory.java | 280 ------------ .../event/comm/bus/internal/BusConsumer.java | 498 +++++++++++++++++++++ .../event/comm/bus/internal/BusPublisher.java | 343 ++++++++++++++ .../comm/bus/internal/FilterableBusConsumer.java | 35 -- .../comm/bus/internal/InlineDmaapTopicSink.java | 10 +- .../comm/bus/internal/InlineUebTopicSink.java | 4 +- .../bus/internal/SingleThreadedBusTopicSource.java | 1 + .../internal/SingleThreadedDmaapTopicSource.java | 16 +- .../bus/internal/SingleThreadedUebTopicSource.java | 3 +- .../bus/internal/impl/CambriaConsumerWrapper.java | 210 --------- .../bus/internal/impl/CambriaPublisherWrapper.java | 127 ------ .../bus/internal/impl/DmaapAafConsumerWrapper.java | 95 ---- .../internal/impl/DmaapAafPublisherWrapper.java | 39 -- .../bus/internal/impl/DmaapConsumerWrapper.java | 146 ------ .../bus/internal/impl/DmaapDmeConsumerWrapper.java | 144 ------ .../internal/impl/DmaapDmePublisherWrapper.java | 112 ----- .../bus/internal/impl/DmaapPublisherWrapper.java | 164 ------- .../event/comm/impl/ProxyTopicEndpointManager.java | 482 -------------------- 33 files changed, 2558 insertions(+), 2994 deletions(-) delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSinkFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSourceFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedNoopTopicSinkFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSinkFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSourceFactory.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/FilterableBusConsumer.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaConsumerWrapper.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaPublisherWrapper.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafConsumerWrapper.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafPublisherWrapper.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapConsumerWrapper.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmeConsumerWrapper.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmePublisherWrapper.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapPublisherWrapper.java delete mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/impl/ProxyTopicEndpointManager.java (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index 03e6776a..e7a21ca1 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -20,6 +20,9 @@ package org.onap.policy.common.endpoints.event.comm; +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -30,6 +33,8 @@ import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into @@ -37,6 +42,11 @@ import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; */ public interface TopicEndpoint extends Startable, Lockable { + /** + * singleton for global access + */ + public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); + /** * Add Topic Sources to the communication infrastructure initialized per properties * @@ -227,3 +237,431 @@ public interface TopicEndpoint extends Startable, Lockable { */ public List getNoopTopicSinks(); } + + +/* + * ----------------- implementation ------------------- + */ + +/** + * This implementation of the Topic Endpoint Manager, proxies operations to appropriate + * implementations according to the communication infrastructure that are supported + */ +class ProxyTopicEndpointManager implements TopicEndpoint { + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); + /** + * Is this element locked? + */ + protected volatile boolean locked = false; + + /** + * Is this element alive? + */ + protected volatile boolean alive = false; + + @Override + public List addTopicSources(Properties properties) { + + // 1. Create UEB Sources + // 2. Create DMAAP Sources + + final List sources = new ArrayList<>(); + + sources.addAll(UebTopicSource.factory.build(properties)); + sources.addAll(DmaapTopicSource.factory.build(properties)); + + if (this.isLocked()) { + for (final TopicSource source : sources) { + source.lock(); + } + } + + return sources; + } + + @Override + public List addTopicSinks(Properties properties) { + // 1. Create UEB Sinks + // 2. Create DMAAP Sinks + + final List sinks = new ArrayList<>(); + + sinks.addAll(UebTopicSink.factory.build(properties)); + sinks.addAll(DmaapTopicSink.factory.build(properties)); + sinks.addAll(NoopTopicSink.factory.build(properties)); + + if (this.isLocked()) { + for (final TopicSink sink : sinks) { + sink.lock(); + } + } + + return sinks; + } + + @Override + public List getTopicSources() { + + final List sources = new ArrayList<>(); + + sources.addAll(UebTopicSource.factory.inventory()); + sources.addAll(DmaapTopicSource.factory.inventory()); + + return sources; + } + + @Override + public List getTopicSinks() { + + final List sinks = new ArrayList<>(); + + sinks.addAll(UebTopicSink.factory.inventory()); + sinks.addAll(DmaapTopicSink.factory.inventory()); + sinks.addAll(NoopTopicSink.factory.inventory()); + + return sinks; + } + + @JsonIgnore + @Override + public List getUebTopicSources() { + return UebTopicSource.factory.inventory(); + } + + @JsonIgnore + @Override + public List getDmaapTopicSources() { + return DmaapTopicSource.factory.inventory(); + } + + @JsonIgnore + @Override + public List getUebTopicSinks() { + return UebTopicSink.factory.inventory(); + } + + @JsonIgnore + @Override + public List getDmaapTopicSinks() { + return DmaapTopicSink.factory.inventory(); + } + + @JsonIgnore + @Override + public List getNoopTopicSinks() { + return NoopTopicSink.factory.inventory(); + } + + @Override + public boolean start() { + + synchronized (this) { + if (this.locked) { + throw new IllegalStateException(this + " is locked"); + } + + if (this.alive) { + return true; + } + + this.alive = true; + } + + final List endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.start() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem starting endpoint: {}", endpoint, e); + } + } + + return success; + } + + + @Override + public boolean stop() { + + /* + * stop regardless if it is locked, in other words, stop operation has precedence over + * locks. + */ + synchronized (this) { + this.alive = false; + } + + final List endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.stop() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem stopping endpoint: {}", endpoint, e); + } + } + + return success; + } + + /** + * + * @return list of managed endpoints + */ + @JsonIgnore + protected List getEndpoints() { + final List endpoints = new ArrayList<>(); + + endpoints.addAll(this.getTopicSources()); + endpoints.addAll(this.getTopicSinks()); + + return endpoints; + } + + @Override + public void shutdown() { + UebTopicSource.factory.destroy(); + UebTopicSink.factory.destroy(); + NoopTopicSink.factory.destroy(); + + DmaapTopicSource.factory.destroy(); + DmaapTopicSink.factory.destroy(); + } + + @Override + public boolean isAlive() { + return this.alive; + } + + @Override + public boolean lock() { + + synchronized (this) { + if (this.locked) { + return true; + } + + this.locked = true; + } + + for (final TopicSource source : this.getTopicSources()) { + source.lock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.lock(); + } + + return true; + } + + @Override + public boolean unlock() { + synchronized (this) { + if (!this.locked) { + return true; + } + + this.locked = false; + } + + for (final TopicSource source : this.getTopicSources()) { + source.unlock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.unlock(); + } + + return true; + } + + @Override + public boolean isLocked() { + return this.locked; + } + + @Override + public List getTopicSources(List topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List sources = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSource uebSource = this.getUebTopicSource(topic); + if (uebSource != null) { + sources.add(uebSource); + } + } catch (final Exception e) { + logger.debug("No UEB source for topic: {}", topic, e); + } + + try { + final TopicSource dmaapSource = this.getDmaapTopicSource(topic); + if (dmaapSource != null) { + sources.add(dmaapSource); + } + } catch (final Exception e) { + logger.debug("No DMAAP source for topic: {}", topic, e); + } + } + return sources; + } + + @Override + public List getTopicSinks(List topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List sinks = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSink uebSink = this.getUebTopicSink(topic); + if (uebSink != null) { + sinks.add(uebSink); + } + } catch (final Exception e) { + logger.debug("No UEB sink for topic: {}", topic, e); + } + + try { + final TopicSink dmaapSink = this.getDmaapTopicSink(topic); + if (dmaapSink != null) { + sinks.add(dmaapSink); + } + } catch (final Exception e) { + logger.debug("No DMAAP sink for topic: {}", topic, e); + } + + try { + final TopicSink noopSink = this.getNoopTopicSink(topic); + if (noopSink != null) { + sinks.add(noopSink); + } + } catch (final Exception e) { + logger.debug("No NOOP sink for topic: {}", topic, e); + } + } + return sinks; + } + + @Override + public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { + + if (commType == null) { + throw parmException(topicName); + } + + if (topicName == null) { + throw parmException(topicName); + } + + switch (commType) { + case UEB: + return this.getUebTopicSource(topicName); + case DMAAP: + return this.getDmaapTopicSource(topicName); + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + private IllegalArgumentException parmException(String topicName) { + return new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + @Override + public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) { + if (commType == null) { + throw parmException(topicName); + } + + if (topicName == null) { + throw parmException(topicName); + } + + switch (commType) { + case UEB: + return this.getUebTopicSink(topicName); + case DMAAP: + return this.getDmaapTopicSink(topicName); + case NOOP: + return this.getNoopTopicSink(topicName); + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + @Override + public List getTopicSinks(String topicName) { + if (topicName == null) { + throw parmException(topicName); + } + + final List sinks = new ArrayList<>(); + + try { + sinks.add(this.getUebTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + try { + sinks.add(this.getDmaapTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + try { + sinks.add(this.getNoopTopicSink(topicName)); + } catch (final Exception e) { + logNoSink(topicName, e); + } + + return sinks; + } + + private void logNoSink(String topicName, Exception ex) { + logger.debug("No sink for topic: {}", topicName, ex); + } + + @Override + public UebTopicSource getUebTopicSource(String topicName) { + return UebTopicSource.factory.get(topicName); + } + + @Override + public UebTopicSink getUebTopicSink(String topicName) { + return UebTopicSink.factory.get(topicName); + } + + @Override + public DmaapTopicSource getDmaapTopicSource(String topicName) { + return DmaapTopicSource.factory.get(topicName); + } + + @Override + public DmaapTopicSink getDmaapTopicSink(String topicName) { + return DmaapTopicSink.factory.get(topicName); + } + + @Override + public NoopTopicSink getNoopTopicSink(String topicName) { + return NoopTopicSink.factory.get(topicName); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java index 845945cd..ba219322 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java @@ -21,4 +21,10 @@ package org.onap.policy.common.endpoints.event.comm.bus; public interface DmaapTopicSink extends BusTopicSink { + + /** + * Factory of UebTopicWriter for instantiation and management purposes + */ + + public static final DmaapTopicSinkFactory factory = new IndexedDmaapTopicSinkFactory(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java index 2e3ecf29..26e8d413 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java @@ -20,10 +20,18 @@ package org.onap.policy.common.endpoints.event.comm.bus; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * DMAAP Topic Sink Factory */ @@ -133,3 +141,284 @@ public interface DmaapTopicSinkFactory { */ public void destroy(); } + + +/* ------------- implementation ----------------- */ + +/** + * Factory of DMAAP Reader Topics indexed by topic name + */ +class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class); + + /** + * DMAAP Topic Name Index + */ + protected HashMap dmaapTopicWriters = new HashMap<>(); + + @Override + public DmaapTopicSink build(List servers, String topic, String apiKey, String apiSecret, String userName, + String password, String partitionKey, String environment, String aftEnvironment, String partner, + String latitude, String longitude, Map additionalProps, boolean managed, boolean useHttps, + boolean allowSelfSignedCerts) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicWriters.containsKey(topic)) { + return dmaapTopicWriters.get(topic); + } + + DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName, + password, partitionKey, environment, aftEnvironment, partner, latitude, longitude, additionalProps, + useHttps, allowSelfSignedCerts); + + if (managed) { + dmaapTopicWriters.put(topic, dmaapTopicSink); + } + return dmaapTopicSink; + } + } + + @Override + public DmaapTopicSink build(List servers, String topic, String apiKey, String apiSecret, String userName, + String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicWriters.containsKey(topic)) { + return dmaapTopicWriters.get(topic); + } + + DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName, + password, partitionKey, useHttps, allowSelfSignedCerts); + + if (managed) { + dmaapTopicWriters.put(topic, dmaapTopicSink); + } + return dmaapTopicSink; + } + } + + @Override + public DmaapTopicSink build(List servers, String topic) { + return this.build(servers, topic, null, null, null, null, null, true, false, false); + } + + @Override + public List build(Properties properties) { + + String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS); + if (writeTopics == null || writeTopics.isEmpty()) { + logger.info("{}: no topic for DMaaP Sink", this); + return new ArrayList<>(); + } + + List writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); + List newDmaapTopicSinks = new ArrayList<>(); + synchronized (this) { + for (String topic : writeTopicList) { + if (this.dmaapTopicWriters.containsKey(topic)) { + newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic)); + continue; + } + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + List serverList; + if (servers != null && !servers.isEmpty()) { + serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + } else { + serverList = new ArrayList<>(); + } + + String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + + String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); + String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); + + String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + + /* DME2 Properties */ + + String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + + String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + + String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); + + String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); + + String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + + String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + + String dme2EpReadTimeoutMs = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); + + String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); + + String dme2RoundtripTimeoutMs = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); + + String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); + + String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); + + String dme2SessionStickinessRequired = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); + + Map dme2AdditionalProps = new HashMap<>(); + + if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { + dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); + } + if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { + dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); + } + if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { + dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); + } + if (dme2Version != null && !dme2Version.isEmpty()) { + dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); + } + if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { + dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + } + if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { + dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); + } + if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { + dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); + } + + if (servers == null || servers.isEmpty()) { + logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); + continue; + } + + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + // default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()) { + useHttps = Boolean.parseBoolean(useHttpsString); + } + + + String allowSelfSignedCertsString = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + // default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword, + partitionKey, dme2Environment, dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, + dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts); + + newDmaapTopicSinks.add(dmaapTopicSink); + } + return newDmaapTopicSinks; + } + } + + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + DmaapTopicSink dmaapTopicWriter; + synchronized (this) { + if (!dmaapTopicWriters.containsKey(topic)) { + return; + } + + dmaapTopicWriter = dmaapTopicWriters.remove(topic); + } + + dmaapTopicWriter.shutdown(); + } + + @Override + public void destroy() { + List writers = this.inventory(); + for (DmaapTopicSink writer : writers) { + writer.shutdown(); + } + + synchronized (this) { + this.dmaapTopicWriters.clear(); + } + } + + @Override + public DmaapTopicSink get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicWriters.containsKey(topic)) { + return dmaapTopicWriters.get(topic); + } else { + throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); + } + } + } + + @Override + public synchronized List inventory() { + return new ArrayList<>(this.dmaapTopicWriters.values()); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("IndexedDmaapTopicSinkFactory []"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java index b50c752f..ffed5bab 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java @@ -21,4 +21,9 @@ package org.onap.policy.common.endpoints.event.comm.bus; public interface DmaapTopicSource extends BusTopicSource { + + /** + * factory for managing and tracking DMAAP sources + */ + public static DmaapTopicSourceFactory factory = new IndexedDmaapTopicSourceFactory(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java index adfb4b42..96ab6c63 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java @@ -20,10 +20,18 @@ package org.onap.policy.common.endpoints.event.comm.bus; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * DMAAP Topic Source Factory */ @@ -156,3 +164,360 @@ public interface DmaapTopicSourceFactory { */ public List inventory(); } + + +/* ------------- implementation ----------------- */ + +/** + * Factory of DMAAP Source Topics indexed by topic name + */ + +class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class); + + /** + * DMaaP Topic Name Index + */ + protected HashMap dmaapTopicSources = new HashMap<>(); + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSource build(List servers, String topic, String apiKey, String apiSecret, String userName, + String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, + String environment, String aftEnvironment, String partner, String latitude, String longitude, + Map additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicSources.containsKey(topic)) { + return dmaapTopicSources.get(topic); + } + + DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, + userName, password, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, environment, + aftEnvironment, partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts); + + if (managed) { + dmaapTopicSources.put(topic, dmaapTopicSource); + } + + return dmaapTopicSource; + } + } + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSource build(List servers, String topic, String apiKey, String apiSecret, String userName, + String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, + boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { + + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("DMaaP Server(s) must be provided"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicSources.containsKey(topic)) { + return dmaapTopicSources.get(topic); + } + + DmaapTopicSource dmaapTopicSource = + new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, userName, password, + consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); + + if (managed) { + dmaapTopicSources.put(topic, dmaapTopicSource); + } + + return dmaapTopicSource; + } + } + + /** + * {@inheritDoc} + */ + @Override + public List build(Properties properties) { + + String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS); + if (readTopics == null || readTopics.isEmpty()) { + logger.info("{}: no topic for DMaaP Source", this); + return new ArrayList<>(); + } + List readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); + + List dmaapTopicSourceLst = new ArrayList<>(); + synchronized (this) { + for (String topic : readTopicList) { + if (this.dmaapTopicSources.containsKey(topic)) { + dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + List serverList; + if (servers != null && !servers.isEmpty()) { + serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + } else { + serverList = new ArrayList<>(); + } + + String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + + String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + + String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); + + String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); + + String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); + + String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); + + String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); + + /* DME2 Properties */ + + String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + + String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + + String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); + + String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); + + String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + + String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + + String dme2EpReadTimeoutMs = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); + + String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); + + String dme2RoundtripTimeoutMs = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); + + String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); + + String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); + + String dme2SessionStickinessRequired = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); + + Map dme2AdditionalProps = new HashMap<>(); + + if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { + dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); + } + if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { + dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); + } + if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { + dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); + } + if (dme2Version != null && !dme2Version.isEmpty()) { + dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); + } + if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { + dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + } + if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { + dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); + } + if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { + dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); + } + + + if (servers == null || servers.isEmpty()) { + + logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); + continue; + } + + int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH; + if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { + try { + fetchTimeout = Integer.parseInt(fetchTimeoutString); + } catch (NumberFormatException nfe) { + logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, + topic); + } + } + + String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); + int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH; + if (fetchLimitString != null && !fetchLimitString.isEmpty()) { + try { + fetchLimit = Integer.parseInt(fetchLimitString); + } catch (NumberFormatException nfe) { + logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, + topic); + } + } + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + // default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()) { + useHttps = Boolean.parseBoolean(useHttpsString); + } + + String allowSelfSignedCertsString = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + // default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + + DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId, + aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment, + dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed, + useHttps, allowSelfSignedCerts); + + dmaapTopicSourceLst.add(uebTopicSource); + } + } + return dmaapTopicSourceLst; + } + + /** + * {@inheritDoc} + * + * @throws IllegalArgumentException + */ + @Override + public DmaapTopicSource build(List servers, String topic, String apiKey, String apiSecret) { + return this.build(servers, topic, apiKey, apiSecret, null, null, null, null, + DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false); + } + + /** + * {@inheritDoc} + * + * @throws IllegalArgumentException + */ + @Override + public DmaapTopicSource build(List servers, String topic) { + return this.build(servers, topic, null, null); + } + + /** + * {@inheritDoc} + */ + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + DmaapTopicSource uebTopicSource; + + synchronized (this) { + if (!dmaapTopicSources.containsKey(topic)) { + return; + } + + uebTopicSource = dmaapTopicSources.remove(topic); + } + + uebTopicSource.shutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSource get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicSources.containsKey(topic)) { + return dmaapTopicSources.get(topic); + } else { + throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found"); + } + } + } + + @Override + public synchronized List inventory() { + return new ArrayList<>(this.dmaapTopicSources.values()); + } + + @Override + public void destroy() { + List readers = this.inventory(); + for (DmaapTopicSource reader : readers) { + reader.shutdown(); + } + + synchronized (this) { + this.dmaapTopicSources.clear(); + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("IndexedDmaapTopicSourceFactory []"); + return builder.toString(); + } + +} + diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java index c6cbf343..6f9f0adc 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java @@ -32,6 +32,11 @@ import org.slf4j.LoggerFactory; */ public class NoopTopicSink extends TopicBase implements TopicSink { + /** + * factory + */ + public static final NoopTopicSinkFactory factory = new IndexedNoopTopicSinkFactory(); + /** * logger */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java index c555d94f..ee1672d7 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java @@ -20,9 +20,16 @@ package org.onap.policy.common.endpoints.event.comm.bus; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Properties; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Noop Topic Sink Factory */ @@ -80,3 +87,145 @@ public interface NoopTopicSinkFactory { */ public void destroy(); } + + +/* ------------- implementation ----------------- */ + +/** + * Factory of noop sinks + */ +class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory { + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); + + /** + * noop topic sinks map + */ + protected HashMap noopTopicSinks = new HashMap<>(); + + @Override + public List build(Properties properties) { + + final String sinkTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS); + if (sinkTopics == null || sinkTopics.isEmpty()) { + logger.info("{}: no topic for noop sink", this); + return new ArrayList<>(); + } + + final List sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*"))); + final List newSinks = new ArrayList<>(); + synchronized (this) { + for (final String topic : sinkTopicList) { + if (this.noopTopicSinks.containsKey(topic)) { + newSinks.add(this.noopTopicSinks.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + if (servers == null || servers.isEmpty()) { + servers = "noop"; + } + + final List serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + final NoopTopicSink noopSink = this.build(serverList, topic, managed); + newSinks.add(noopSink); + } + return newSinks; + } + } + + @Override + public NoopTopicSink build(List servers, String topic, boolean managed) { + + List noopSinkServers = servers; + if (noopSinkServers == null) { + noopSinkServers = new ArrayList<>(); + } + + if (noopSinkServers.isEmpty()) { + noopSinkServers.add("noop"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (this.noopTopicSinks.containsKey(topic)) { + return this.noopTopicSinks.get(topic); + } + + final NoopTopicSink sink = new NoopTopicSink(noopSinkServers, topic); + + if (managed) { + this.noopTopicSinks.put(topic, sink); + } + + return sink; + } + } + + @Override + public void destroy(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + NoopTopicSink noopSink; + synchronized (this) { + if (!this.noopTopicSinks.containsKey(topic)) { + return; + } + + noopSink = this.noopTopicSinks.remove(topic); + } + + noopSink.shutdown(); + } + + @Override + public void destroy() { + final List sinks = this.inventory(); + for (final NoopTopicSink sink : sinks) { + sink.shutdown(); + } + + synchronized (this) { + this.noopTopicSinks.clear(); + } + } + + @Override + public NoopTopicSink get(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (this.noopTopicSinks.containsKey(topic)) { + return this.noopTopicSinks.get(topic); + } else { + throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); + } + } + } + + @Override + public List inventory() { + return new ArrayList<>(this.noopTopicSinks.values()); + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java index 0e9398de..b6e4acbe 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java @@ -25,4 +25,8 @@ package org.onap.policy.common.endpoints.event.comm.bus; */ public interface UebTopicSink extends BusTopicSink { + /** + * Factory of UEB Topic Sinks for instantiation and management purposes + */ + public static final UebTopicSinkFactory factory = new IndexedUebTopicSinkFactory(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java index 37920635..a522e2c5 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java @@ -20,9 +20,17 @@ package org.onap.policy.common.endpoints.event.comm.bus; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * UEB Topic Sink Factory */ @@ -96,3 +104,188 @@ public interface UebTopicSinkFactory { */ public void destroy(); } + + +/* ------------- implementation ----------------- */ + +/** + * Factory of UEB Reader Topics indexed by topic name + */ +class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); + + /** + * UEB Topic Name Index + */ + protected HashMap uebTopicSinks = new HashMap<>(); + + @Override + public UebTopicSink build(List servers, String topic, String apiKey, String apiSecret, String partitionKey, + boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { + + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("UEB Server(s) must be provided"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (uebTopicSinks.containsKey(topic)) { + return uebTopicSinks.get(topic); + } + + UebTopicSink uebTopicWriter = new InlineUebTopicSink(servers, topic, apiKey, apiSecret, partitionKey, + useHttps, allowSelfSignedCerts); + + if (managed) { + uebTopicSinks.put(topic, uebTopicWriter); + } + + return uebTopicWriter; + } + } + + + @Override + public UebTopicSink build(List servers, String topic) { + return this.build(servers, topic, null, null, null, true, false, false); + } + + + @Override + public List build(Properties properties) { + + String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS); + if (writeTopics == null || writeTopics.isEmpty()) { + logger.info("{}: no topic for UEB Sink", this); + return new ArrayList<>(); + } + + List writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); + List newUebTopicSinks = new ArrayList<>(); + synchronized (this) { + for (String topic : writeTopicList) { + if (this.uebTopicSinks.containsKey(topic)) { + newUebTopicSinks.add(this.uebTopicSinks.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (servers == null || servers.isEmpty()) { + logger.error("{}: no UEB servers configured for sink {}", this, topic); + continue; + } + + List serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + // default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()) { + useHttps = Boolean.parseBoolean(useHttpsString); + } + + + String allowSelfSignedCertsString = + properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + // default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + UebTopicSink uebTopicWriter = this.build(serverList, topic, apiKey, apiSecret, partitionKey, managed, + useHttps, allowSelfSignedCerts); + newUebTopicSinks.add(uebTopicWriter); + } + return newUebTopicSinks; + } + } + + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + UebTopicSink uebTopicWriter; + synchronized (this) { + if (!uebTopicSinks.containsKey(topic)) { + return; + } + + uebTopicWriter = uebTopicSinks.remove(topic); + } + + uebTopicWriter.shutdown(); + } + + @Override + public void destroy() { + List writers = this.inventory(); + for (UebTopicSink writer : writers) { + writer.shutdown(); + } + + synchronized (this) { + this.uebTopicSinks.clear(); + } + } + + @Override + public UebTopicSink get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (uebTopicSinks.containsKey(topic)) { + return uebTopicSinks.get(topic); + } else { + throw new IllegalStateException("UebTopicSink for " + topic + " not found"); + } + } + } + + @Override + public synchronized List inventory() { + return new ArrayList<>(this.uebTopicSinks.values()); + } + + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("IndexedUebTopicSinkFactory []"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java index db14800c..25bbce62 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java @@ -26,4 +26,8 @@ package org.onap.policy.common.endpoints.event.comm.bus; */ public interface UebTopicSource extends BusTopicSource { + /** + * factory for managing and tracking UEB readers + */ + public static UebTopicSourceFactory factory = new IndexedUebTopicSourceFactory(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java index d3d632e0..c6cf3095 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java @@ -20,9 +20,17 @@ package org.onap.policy.common.endpoints.event.comm.bus; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * UEB Topic Source Factory */ @@ -112,3 +120,240 @@ public interface UebTopicSourceFactory { */ public List inventory(); } + + +/* ------------- implementation ----------------- */ + +/** + * Factory of UEB Source Topics indexed by topic name + */ +class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class); + + /** + * UEB Topic Name Index + */ + protected HashMap uebTopicSources = new HashMap<>(); + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSource build(List servers, String topic, String apiKey, String apiSecret, + String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed, + boolean useHttps, boolean allowSelfSignedCerts) { + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("UEB Server(s) must be provided"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (uebTopicSources.containsKey(topic)) { + return uebTopicSources.get(topic); + } + + UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(servers, topic, apiKey, apiSecret, + consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); + + if (managed) { + uebTopicSources.put(topic, uebTopicSource); + } + + return uebTopicSource; + } + } + + /** + * {@inheritDoc} + */ + @Override + public List build(Properties properties) { + + String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS); + if (readTopics == null || readTopics.isEmpty()) { + logger.info("{}: no topic for UEB Source", this); + return new ArrayList<>(); + } + List readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); + + List newUebTopicSources = new ArrayList<>(); + synchronized (this) { + for (String topic : readTopicList) { + if (this.uebTopicSources.containsKey(topic)) { + newUebTopicSources.add(this.uebTopicSources.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + if (servers == null || servers.isEmpty()) { + logger.error("{}: no UEB servers configured for sink {}", this, topic); + continue; + } + + List serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + + String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + + String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); + + String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); + + String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); + int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH; + if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { + try { + fetchTimeout = Integer.parseInt(fetchTimeoutString); + } catch (NumberFormatException nfe) { + logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, + topic); + } + } + + String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); + int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH; + if (fetchLimitString != null && !fetchLimitString.isEmpty()) { + try { + fetchLimit = Integer.parseInt(fetchLimitString); + } catch (NumberFormatException nfe) { + logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, + topic); + } + } + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + // default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()) { + useHttps = Boolean.parseBoolean(useHttpsString); + } + + String allowSelfSignedCertsString = + properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + // default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + UebTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, consumerGroup, + consumerInstance, fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts); + newUebTopicSources.add(uebTopicSource); + } + } + return newUebTopicSources; + } + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSource build(List servers, String topic, String apiKey, String apiSecret) { + + return this.build(servers, topic, apiKey, apiSecret, null, null, UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH, + UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true); + } + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSource build(List servers, String topic) { + return this.build(servers, topic, null, null); + } + + /** + * {@inheritDoc} + */ + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + UebTopicSource uebTopicSource; + + synchronized (this) { + if (!uebTopicSources.containsKey(topic)) { + return; + } + + uebTopicSource = uebTopicSources.remove(topic); + } + + uebTopicSource.shutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + public UebTopicSource get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (uebTopicSources.containsKey(topic)) { + return uebTopicSources.get(topic); + } else { + throw new IllegalStateException("UebTopiceSource for " + topic + " not found"); + } + } + } + + @Override + public synchronized List inventory() { + return new ArrayList<>(this.uebTopicSources.values()); + } + + @Override + public void destroy() { + List readers = this.inventory(); + for (UebTopicSource reader : readers) { + reader.shutdown(); + } + + synchronized (this) { + this.uebTopicSources.clear(); + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("IndexedUebTopicSourceFactory []"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSinkFactory.java deleted file mode 100644 index e252988e..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSinkFactory.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.impl; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory; -import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of DMAAP Reader Topics indexed by topic name - */ -public class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - private static final IndexedDmaapTopicSinkFactory instance = new IndexedDmaapTopicSinkFactory(); - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class); - - /** - * DMAAP Topic Name Index - */ - protected HashMap dmaapTopicWriters = new HashMap<>(); - - /** - * Get the singleton instance. - * - * @return the instance - */ - public static IndexedDmaapTopicSinkFactory getInstance() { - return instance; - } - - private IndexedDmaapTopicSinkFactory() {} - - @Override - public DmaapTopicSink build(List servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, String environment, String aftEnvironment, String partner, - String latitude, String longitude, Map additionalProps, boolean managed, boolean useHttps, - boolean allowSelfSignedCerts) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicWriters.containsKey(topic)) { - return dmaapTopicWriters.get(topic); - } - - DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName, - password, partitionKey, environment, aftEnvironment, partner, latitude, longitude, additionalProps, - useHttps, allowSelfSignedCerts); - - if (managed) { - dmaapTopicWriters.put(topic, dmaapTopicSink); - } - return dmaapTopicSink; - } - } - - @Override - public DmaapTopicSink build(List servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicWriters.containsKey(topic)) { - return dmaapTopicWriters.get(topic); - } - - DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName, - password, partitionKey, useHttps, allowSelfSignedCerts); - - if (managed) { - dmaapTopicWriters.put(topic, dmaapTopicSink); - } - return dmaapTopicSink; - } - } - - @Override - public DmaapTopicSink build(List servers, String topic) { - return this.build(servers, topic, null, null, null, null, null, true, false, false); - } - - @Override - public List build(Properties properties) { - - String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS); - if (writeTopics == null || writeTopics.isEmpty()) { - logger.info("{}: no topic for DMaaP Sink", this); - return new ArrayList<>(); - } - - List writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); - List newDmaapTopicSinks = new ArrayList<>(); - synchronized (this) { - for (String topic : writeTopicList) { - if (this.dmaapTopicWriters.containsKey(topic)) { - newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic)); - continue; - } - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - List serverList; - if (servers != null && !servers.isEmpty()) { - serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - } else { - serverList = new ArrayList<>(); - } - - String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); - String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); - - String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - - /* DME2 Properties */ - - String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - - String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - - String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); - - String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); - - String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - - String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - - String dme2EpReadTimeoutMs = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); - - String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); - - String dme2RoundtripTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); - - String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); - - String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); - - String dme2SessionStickinessRequired = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); - - Map dme2AdditionalProps = new HashMap<>(); - - if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); - } - if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { - dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); - } - if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); - } - if (dme2Version != null && !dme2Version.isEmpty()) { - dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); - } - if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { - dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); - } - if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { - dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); - } - - if (servers == null || servers.isEmpty()) { - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - continue; - } - - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword, - partitionKey, dme2Environment, dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, - dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts); - - newDmaapTopicSinks.add(dmaapTopicSink); - } - return newDmaapTopicSinks; - } - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - DmaapTopicSink dmaapTopicWriter; - synchronized (this) { - if (!dmaapTopicWriters.containsKey(topic)) { - return; - } - - dmaapTopicWriter = dmaapTopicWriters.remove(topic); - } - - dmaapTopicWriter.shutdown(); - } - - @Override - public void destroy() { - List writers = this.inventory(); - for (DmaapTopicSink writer : writers) { - writer.shutdown(); - } - - synchronized (this) { - this.dmaapTopicWriters.clear(); - } - } - - @Override - public DmaapTopicSink get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicWriters.containsKey(topic)) { - return dmaapTopicWriters.get(topic); - } else { - throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); - } - } - } - - @Override - public synchronized List inventory() { - return new ArrayList<>(this.dmaapTopicWriters.values()); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedDmaapTopicSinkFactory []"); - return builder.toString(); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSourceFactory.java deleted file mode 100644 index 0e469a1d..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedDmaapTopicSourceFactory.java +++ /dev/null @@ -1,401 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.impl; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSourceFactory; -import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of DMAAP Source Topics indexed by topic name - */ - -public class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - private static final IndexedDmaapTopicSourceFactory instance = new IndexedDmaapTopicSourceFactory(); - - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class); - - /** - * DMaaP Topic Name Index - */ - protected HashMap dmaapTopicSources = new HashMap<>(); - - /** - * Get the singleton instance. - * - * @return the instance - */ - public static IndexedDmaapTopicSourceFactory getInstance() { - return instance; - } - - private IndexedDmaapTopicSourceFactory() {} - - /** - * {@inheritDoc} - */ - @Override - public DmaapTopicSource build(List servers, String topic, String apiKey, String apiSecret, String userName, - String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, - String environment, String aftEnvironment, String partner, String latitude, String longitude, - Map additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicSources.containsKey(topic)) { - return dmaapTopicSources.get(topic); - } - - DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, - userName, password, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, environment, - aftEnvironment, partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts); - - if (managed) { - dmaapTopicSources.put(topic, dmaapTopicSource); - } - - return dmaapTopicSource; - } - } - - /** - * {@inheritDoc} - */ - @Override - public DmaapTopicSource build(List servers, String topic, String apiKey, String apiSecret, String userName, - String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, - boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { - - if (servers == null || servers.isEmpty()) { - throw new IllegalArgumentException("DMaaP Server(s) must be provided"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicSources.containsKey(topic)) { - return dmaapTopicSources.get(topic); - } - - DmaapTopicSource dmaapTopicSource = - new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, userName, password, - consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); - - if (managed) { - dmaapTopicSources.put(topic, dmaapTopicSource); - } - - return dmaapTopicSource; - } - } - - /** - * {@inheritDoc} - */ - @Override - public List build(Properties properties) { - - String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - if (readTopics == null || readTopics.isEmpty()) { - logger.info("{}: no topic for DMaaP Source", this); - return new ArrayList<>(); - } - List readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); - - List dmaapTopicSourceLst = new ArrayList<>(); - synchronized (this) { - for (String topic : readTopicList) { - if (this.dmaapTopicSources.containsKey(topic)) { - dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - List serverList; - if (servers != null && !servers.isEmpty()) { - serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - } else { - serverList = new ArrayList<>(); - } - - String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - - String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); - - String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); - - String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); - - String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); - - String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); - - /* DME2 Properties */ - - String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - - String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - - String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); - - String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); - - String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - - String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - - String dme2EpReadTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); - - String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); - - String dme2RoundtripTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); - - String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); - - String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); - - String dme2SessionStickinessRequired = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); - - Map dme2AdditionalProps = new HashMap<>(); - - if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); - } - if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { - dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); - } - if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); - } - if (dme2Version != null && !dme2Version.isEmpty()) { - dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); - } - if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { - dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); - } - if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { - dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); - } - - - if (servers == null || servers.isEmpty()) { - - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - continue; - } - - int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH; - if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { - try { - fetchTimeout = Integer.parseInt(fetchTimeoutString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, - topic); - } - } - - String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); - int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH; - if (fetchLimitString != null && !fetchLimitString.isEmpty()) { - try { - fetchLimit = Integer.parseInt(fetchLimitString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, - topic); - } - } - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - - DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId, - aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment, - dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed, - useHttps, allowSelfSignedCerts); - - dmaapTopicSourceLst.add(uebTopicSource); - } - } - return dmaapTopicSourceLst; - } - - /** - * {@inheritDoc} - * - * @throws IllegalArgumentException - */ - @Override - public DmaapTopicSource build(List servers, String topic, String apiKey, String apiSecret) { - return this.build(servers, topic, apiKey, apiSecret, null, null, null, null, - DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false); - } - - /** - * {@inheritDoc} - * - * @throws IllegalArgumentException - */ - @Override - public DmaapTopicSource build(List servers, String topic) { - return this.build(servers, topic, null, null); - } - - /** - * {@inheritDoc} - */ - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - DmaapTopicSource uebTopicSource; - - synchronized (this) { - if (!dmaapTopicSources.containsKey(topic)) { - return; - } - - uebTopicSource = dmaapTopicSources.remove(topic); - } - - uebTopicSource.shutdown(); - } - - /** - * {@inheritDoc} - */ - @Override - public DmaapTopicSource get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicSources.containsKey(topic)) { - return dmaapTopicSources.get(topic); - } else { - throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found"); - } - } - } - - @Override - public synchronized List inventory() { - return new ArrayList<>(this.dmaapTopicSources.values()); - } - - @Override - public void destroy() { - List readers = this.inventory(); - for (DmaapTopicSource reader : readers) { - reader.shutdown(); - } - - synchronized (this) { - this.dmaapTopicSources.clear(); - } - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedDmaapTopicSourceFactory []"); - return builder.toString(); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedNoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedNoopTopicSinkFactory.java deleted file mode 100644 index 78bfd46c..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedNoopTopicSinkFactory.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.impl; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; - -import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSinkFactory; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of noop sinks - */ -public class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - private static final IndexedNoopTopicSinkFactory instance = new IndexedNoopTopicSinkFactory(); - - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); - - /** - * noop topic sinks map - */ - protected HashMap noopTopicSinks = new HashMap<>(); - - /** - * Get the singleton instance. - * - * @return the instance - */ - public static IndexedNoopTopicSinkFactory getInstance() { - return instance; - } - - private IndexedNoopTopicSinkFactory() {} - - @Override - public List build(Properties properties) { - - final String sinkTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS); - if (sinkTopics == null || sinkTopics.isEmpty()) { - logger.info("{}: no topic for noop sink", this); - return new ArrayList<>(); - } - - final List sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*"))); - final List newSinks = new ArrayList<>(); - synchronized (this) { - for (final String topic : sinkTopicList) { - if (this.noopTopicSinks.containsKey(topic)) { - newSinks.add(this.noopTopicSinks.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - if (servers == null || servers.isEmpty()) { - servers = "noop"; - } - - final List serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - final NoopTopicSink noopSink = this.build(serverList, topic, managed); - newSinks.add(noopSink); - } - return newSinks; - } - } - - @Override - public NoopTopicSink build(List servers, String topic, boolean managed) { - - List noopSinkServers = servers; - if (noopSinkServers == null) { - noopSinkServers = new ArrayList<>(); - } - - if (noopSinkServers.isEmpty()) { - noopSinkServers.add("noop"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (this.noopTopicSinks.containsKey(topic)) { - return this.noopTopicSinks.get(topic); - } - - final NoopTopicSink sink = new NoopTopicSink(noopSinkServers, topic); - - if (managed) { - this.noopTopicSinks.put(topic, sink); - } - - return sink; - } - } - - @Override - public void destroy(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - NoopTopicSink noopSink; - synchronized (this) { - if (!this.noopTopicSinks.containsKey(topic)) { - return; - } - - noopSink = this.noopTopicSinks.remove(topic); - } - - noopSink.shutdown(); - } - - @Override - public void destroy() { - final List sinks = this.inventory(); - for (final NoopTopicSink sink : sinks) { - sink.shutdown(); - } - - synchronized (this) { - this.noopTopicSinks.clear(); - } - } - - @Override - public NoopTopicSink get(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (this.noopTopicSinks.containsKey(topic)) { - return this.noopTopicSinks.get(topic); - } else { - throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); - } - } - } - - @Override - public List inventory() { - return new ArrayList<>(this.noopTopicSinks.values()); - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSinkFactory.java deleted file mode 100644 index 19bdc7b2..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSinkFactory.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.impl; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; - -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSinkFactory; -import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of UEB Reader Topics indexed by topic name - */ -public class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { - - private static final IndexedUebTopicSinkFactory instance = new IndexedUebTopicSinkFactory(); - - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); - - /** - * UEB Topic Name Index - */ - protected HashMap uebTopicSinks = new HashMap<>(); - - /** - * Get the singleton instance. - * - * @return the instance - */ - public static IndexedUebTopicSinkFactory getInstance() { - return instance; - } - - private IndexedUebTopicSinkFactory() {} - - @Override - public UebTopicSink build(List servers, String topic, String apiKey, String apiSecret, String partitionKey, - boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { - - if (servers == null || servers.isEmpty()) { - throw new IllegalArgumentException("UEB Server(s) must be provided"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSinks.containsKey(topic)) { - return uebTopicSinks.get(topic); - } - - UebTopicSink uebTopicWriter = new InlineUebTopicSink(servers, topic, apiKey, apiSecret, partitionKey, - useHttps, allowSelfSignedCerts); - - if (managed) { - uebTopicSinks.put(topic, uebTopicWriter); - } - - return uebTopicWriter; - } - } - - - @Override - public UebTopicSink build(List servers, String topic) { - return this.build(servers, topic, null, null, null, true, false, false); - } - - - @Override - public List build(Properties properties) { - - String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS); - if (writeTopics == null || writeTopics.isEmpty()) { - logger.info("{}: no topic for UEB Sink", this); - return new ArrayList<>(); - } - - List writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); - List newUebTopicSinks = new ArrayList<>(); - synchronized (this) { - for (String topic : writeTopicList) { - if (this.uebTopicSinks.containsKey(topic)) { - newUebTopicSinks.add(this.uebTopicSinks.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (servers == null || servers.isEmpty()) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - continue; - } - - List serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - - String allowSelfSignedCertsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - UebTopicSink uebTopicWriter = this.build(serverList, topic, apiKey, apiSecret, partitionKey, managed, - useHttps, allowSelfSignedCerts); - newUebTopicSinks.add(uebTopicWriter); - } - return newUebTopicSinks; - } - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - UebTopicSink uebTopicWriter; - synchronized (this) { - if (!uebTopicSinks.containsKey(topic)) { - return; - } - - uebTopicWriter = uebTopicSinks.remove(topic); - } - - uebTopicWriter.shutdown(); - } - - @Override - public void destroy() { - List writers = this.inventory(); - for (UebTopicSink writer : writers) { - writer.shutdown(); - } - - synchronized (this) { - this.uebTopicSinks.clear(); - } - } - - @Override - public UebTopicSink get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSinks.containsKey(topic)) { - return uebTopicSinks.get(topic); - } else { - throw new IllegalStateException("UebTopicSink for " + topic + " not found"); - } - } - } - - @Override - public synchronized List inventory() { - return new ArrayList<>(this.uebTopicSinks.values()); - } - - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedUebTopicSinkFactory []"); - return builder.toString(); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSourceFactory.java deleted file mode 100644 index 5363a30d..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/impl/IndexedUebTopicSourceFactory.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.impl; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; - -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSourceFactory; -import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory of UEB Source Topics indexed by topic name - */ -public class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - private static final IndexedUebTopicSourceFactory instance = new IndexedUebTopicSourceFactory(); - - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class); - - /** - * UEB Topic Name Index - */ - protected HashMap uebTopicSources = new HashMap<>(); - - /** - * Get the singleton instance. - * - * @return the instance - */ - public static IndexedUebTopicSourceFactory getInstance() { - return instance; - } - - private IndexedUebTopicSourceFactory() {} - - /** - * {@inheritDoc} - */ - @Override - public UebTopicSource build(List servers, String topic, String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed, - boolean useHttps, boolean allowSelfSignedCerts) { - if (servers == null || servers.isEmpty()) { - throw new IllegalArgumentException("UEB Server(s) must be provided"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSources.containsKey(topic)) { - return uebTopicSources.get(topic); - } - - UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(servers, topic, apiKey, apiSecret, - consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); - - if (managed) { - uebTopicSources.put(topic, uebTopicSource); - } - - return uebTopicSource; - } - } - - /** - * {@inheritDoc} - */ - @Override - public List build(Properties properties) { - - String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS); - if (readTopics == null || readTopics.isEmpty()) { - logger.info("{}: no topic for UEB Source", this); - return new ArrayList<>(); - } - List readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); - - List newUebTopicSources = new ArrayList<>(); - synchronized (this) { - for (String topic : readTopicList) { - if (this.uebTopicSources.containsKey(topic)) { - newUebTopicSources.add(this.uebTopicSources.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - if (servers == null || servers.isEmpty()) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - continue; - } - - List serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - - String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); - - String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); - - String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); - int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH; - if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { - try { - fetchTimeout = Integer.parseInt(fetchTimeoutString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, - topic); - } - } - - String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); - int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH; - if (fetchLimitString != null && !fetchLimitString.isEmpty()) { - try { - fetchLimit = Integer.parseInt(fetchLimitString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, - topic); - } - } - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - UebTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, consumerGroup, - consumerInstance, fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts); - newUebTopicSources.add(uebTopicSource); - } - } - return newUebTopicSources; - } - - /** - * {@inheritDoc} - */ - @Override - public UebTopicSource build(List servers, String topic, String apiKey, String apiSecret) { - - return this.build(servers, topic, apiKey, apiSecret, null, null, UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH, - UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true); - } - - /** - * {@inheritDoc} - */ - @Override - public UebTopicSource build(List servers, String topic) { - return this.build(servers, topic, null, null); - } - - /** - * {@inheritDoc} - */ - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - UebTopicSource uebTopicSource; - - synchronized (this) { - if (!uebTopicSources.containsKey(topic)) { - return; - } - - uebTopicSource = uebTopicSources.remove(topic); - } - - uebTopicSource.shutdown(); - } - - /** - * {@inheritDoc} - */ - @Override - public UebTopicSource get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSources.containsKey(topic)) { - return uebTopicSources.get(topic); - } else { - throw new IllegalStateException("UebTopiceSource for " + topic + " not found"); - } - } - } - - @Override - public synchronized List inventory() { - return new ArrayList<>(this.uebTopicSources.values()); - } - - @Override - public void destroy() { - List readers = this.inventory(); - for (UebTopicSource reader : readers) { - reader.shutdown(); - } - - synchronized (this) { - this.uebTopicSources.clear(); - } - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedUebTopicSourceFactory []"); - return builder.toString(); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 8cc631c8..636dc6e3 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -20,7 +20,26 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +import com.att.nsa.cambria.client.CambriaConsumer; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.impl.MRConsumerImpl; +import com.att.nsa.mr.client.response.MRConsumerResponse; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; + import java.io.IOException; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Wrapper around libraries to consume from message bus @@ -40,6 +59,485 @@ public interface BusConsumer { * close underlying library consumer */ public void close(); + + /** + * BusConsumer that supports server-side filtering. + */ + public interface FilterableBusConsumer extends BusConsumer { + + /** + * Sets the server-side filter. + * + * @param filter new filter value, or {@code null} + * @throws IllegalArgumentException if the consumer cannot be built with the new filter + */ + public void setFilter(String filter); + } + + /** + * Cambria based consumer + */ + public static class CambriaConsumerWrapper implements FilterableBusConsumer { + + /** + * logger + */ + private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); + + /** + * Used to build the consumer. + */ + private final ConsumerBuilder builder; + + /** + * Locked while updating {@link #consumer} and {@link #newConsumer}. + */ + private final Object consLocker = new Object(); + + /** + * Cambria client + */ + private CambriaConsumer consumer; + + /** + * Cambria client to use for next fetch + */ + private CambriaConsumer newConsumer = null; + + /** + * fetch timeout + */ + protected int fetchTimeout; + + /** + * close condition + */ + protected Object closeCondition = new Object(); + + /** + * Cambria Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws GeneralSecurityException + * @throws MalformedURLException + */ + public CambriaConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, + String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, + boolean useSelfSignedCerts) { + this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout, + fetchLimit, useHttps, useSelfSignedCerts); + } + + public CambriaConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, + String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { + + this.fetchTimeout = fetchTimeout; + + this.builder = new CambriaClientBuilders.ConsumerBuilder(); + + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); + + // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) + builder.withSocketTimeout(fetchTimeout + 30000); + + if (useHttps) { + builder.usingHttps(); + + if (useSelfSignedCerts) { + builder.allowSelfSignedCertificates(); + } + } + + if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { + builder.authenticatedBy(apiKey, apiSecret); + } + + if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { + builder.authenticatedByHttp(username, password); + } + + try { + this.consumer = builder.build(); + } catch (MalformedURLException | GeneralSecurityException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public Iterable fetch() throws IOException, InterruptedException { + try { + return getCurrentConsumer().fetch(); + } catch (final IOException e) { + logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), + this.fetchTimeout); + synchronized (this.closeCondition) { + this.closeCondition.wait(this.fetchTimeout); + } + + throw e; + } + } + + @Override + public void close() { + synchronized (closeCondition) { + closeCondition.notifyAll(); + } + + getCurrentConsumer().close(); + } + + private CambriaConsumer getCurrentConsumer() { + CambriaConsumer old = null; + CambriaConsumer ret; + + synchronized (consLocker) { + if (this.newConsumer != null) { + // replace old consumer with new consumer + old = this.consumer; + this.consumer = this.newConsumer; + this.newConsumer = null; + } + + ret = this.consumer; + } + + if (old != null) { + old.close(); + } + + return ret; + } + + @Override + public void setFilter(String filter) { + logger.info("{}: setting DMAAP server-side filter: {}", this, filter); + builder.withServerSideFilter(filter); + + try { + CambriaConsumer previous; + synchronized (consLocker) { + previous = this.newConsumer; + this.newConsumer = builder.build(); + } + + if (previous != null) { + // there was already a new consumer - close it + previous.close(); + } + + } catch (MalformedURLException | GeneralSecurityException e) { + /* + * Since an exception occurred, "consumer" still has its old value, thus it should + * not be closed at this point. + */ + throw new IllegalArgumentException(e); + } + } + + @Override + public String toString() { + return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; + } + } + + /** + * MR based consumer + */ + public abstract class DmaapConsumerWrapper implements BusConsumer { + + /** + * logger + */ + private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); + + /** + * Name of the "protocol" property. + */ + protected static final String PROTOCOL_PROP = "Protocol"; + + /** + * fetch timeout + */ + protected int fetchTimeout; + + /** + * close condition + */ + protected Object closeCondition = new Object(); + + /** + * MR Consumer + */ + protected MRConsumerImpl consumer; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param username AAF Login + * @param password AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws MalformedURLException + */ + public DmaapConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, + String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit) throws MalformedURLException { + + this.fetchTimeout = fetchTimeout; + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("No topic for DMaaP"); + } + + this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, + fetchLimit, null, apiKey, apiSecret); + + this.consumer.setUsername(username); + this.consumer.setPassword(password); + } + + @Override + public Iterable fetch() throws InterruptedException, IOException { + final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); + if (response == null) { + logger.warn("{}: DMaaP NULL response received", this); + + synchronized (closeCondition) { + closeCondition.wait(fetchTimeout); + } + return new ArrayList<>(); + } else { + logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(), + response.getResponseMessage()); + + if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) { + + logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), + response.getResponseMessage()); + + synchronized (closeCondition) { + closeCondition.wait(fetchTimeout); + } + + /* fall through */ + } + } + + if (response.getActualMessages() == null) { + return new ArrayList<>(); + } else { + return response.getActualMessages(); + } + } + + @Override + public void close() { + synchronized (closeCondition) { + closeCondition.notifyAll(); + } + + this.consumer.close(); + } + + @Override + public String toString() { + return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() + + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() + + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" + + consumer.getUsername() + "]"; + } + } + + /** + * MR based consumer + */ + public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { + + private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); + + private final Properties props; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param aafLogin AAF Login + * @param aafPassword AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws MalformedURLException + */ + public DmaapAafConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, + String aafLogin, String aafPassword, String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit, boolean useHttps) throws MalformedURLException { + + super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, consumerInstance, + fetchTimeout, fetchLimit); + + // super constructor sets servers = {""} if empty to avoid errors when using DME2 + if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) || (servers.isEmpty())) { + throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); + } + + this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + + props = new Properties(); + + if (useHttps) { + props.setProperty(PROTOCOL_PROP, "https"); + this.consumer.setHost(servers.get(0) + ":3905"); + + } else { + props.setProperty(PROTOCOL_PROP, "http"); + this.consumer.setHost(servers.get(0) + ":3904"); + } + + this.consumer.setProps(props); + logger.info("{}: CREATION", this); + } + + @Override + public String toString() { + final MRConsumerImpl consumer = this.consumer; + + return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() + + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() + + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" + + consumer.getUsername() + "]"; + } + } + + public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { + + private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); + + private final Properties props; + + public DmaapDmeConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, + String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude, + String longitude, Map additionalProps, boolean useHttps) throws MalformedURLException { + + + + super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance, + fetchTimeout, fetchLimit); + + + final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + + if (environment == null || environment.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + } + if (aftEnvironment == null || aftEnvironment.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + } + if (latitude == null || latitude.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + } + if (longitude == null || longitude.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + } + + if ((dme2Partner == null || dme2Partner.isEmpty()) + && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { + throw new IllegalArgumentException( + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + + final String serviceName = servers.get(0); + + this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); + + this.consumer.setUsername(dme2Login); + this.consumer.setPassword(dme2Password); + + props = new Properties(); + + props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); + + props.setProperty("username", dme2Login); + props.setProperty("password", dme2Password); + + /* These are required, no defaults */ + props.setProperty("topic", topic); + + props.setProperty("Environment", environment); + props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + + if (dme2Partner != null) { + props.setProperty("Partner", dme2Partner); + } + if (dme2RouteOffer != null) { + props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + } + + props.setProperty("Latitude", latitude); + props.setProperty("Longitude", longitude); + + /* These are optional, will default to these values if not set in additionalProps */ + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); + props.setProperty("Version", "1.0"); + props.setProperty("SubContextPath", "/"); + props.setProperty("sessionstickinessrequired", "no"); + + /* These should not change */ + props.setProperty("TransportType", "DME2"); + props.setProperty("MethodType", "GET"); + + if (useHttps) { + props.setProperty(PROTOCOL_PROP, "https"); + + } else { + props.setProperty(PROTOCOL_PROP, "http"); + } + + props.setProperty("contenttype", "application/json"); + + if (additionalProps != null) { + for (Map.Entry entry : additionalProps.entrySet()) { + props.put(entry.getKey(), entry.getValue()); + } + } + + MRClientFactory.prop = props; + this.consumer.setProps(props); + + logger.info("{}: CREATION", this); + } + + private IllegalArgumentException parmException(String topic, String propnm) { + return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + propnm + " property for DME2 in DMaaP"); + + } + } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index f6c65ada..9db9131c 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -20,6 +20,28 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import com.att.nsa.apiClient.http.HttpClient.ConnectionType; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; +import com.att.nsa.mr.client.response.MRPublisherResponse; +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public interface BusPublisher { /** @@ -36,4 +58,325 @@ public interface BusPublisher { * closes the publisher */ public void close(); + + /** + * Cambria based library publisher + */ + public static class CambriaPublisherWrapper implements BusPublisher { + + private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class); + + /** + * The actual Cambria publisher + */ + @JsonIgnore + protected volatile CambriaBatchingPublisher publisher; + + public CambriaPublisherWrapper(List servers, String topic, String apiKey, String apiSecret, + boolean useHttps) { + this(servers, topic, apiKey, apiSecret, null, null, useHttps, false); + } + + public CambriaPublisherWrapper(List servers, String topic, String apiKey, String apiSecret, + String username, String password, boolean useHttps, boolean selfSignedCerts) { + + PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); + + builder.usingHosts(servers).onTopic(topic); + + // Set read timeout to 30 seconds (TBD: this should be configurable) + builder.withSocketTimeout(30000); + + if (useHttps) { + if (selfSignedCerts) { + builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION); + } else { + builder.withConnectionType(ConnectionType.HTTPS); + } + } + + + if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { + builder.authenticatedBy(apiKey, apiSecret); + } + + if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { + builder.authenticatedByHttp(username, password); + } + + try { + this.publisher = builder.build(); + } catch (MalformedURLException | GeneralSecurityException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean send(String partitionId, String message) { + if (message == null) { + throw new IllegalArgumentException("No message provided"); + } + + try { + this.publisher.send(partitionId, message); + } catch (Exception e) { + logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e); + return false; + } + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + logger.info("{}: CLOSE", this); + + try { + this.publisher.close(); + } catch (Exception e) { + logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); + } + } + + + @Override + public String toString() { + return "CambriaPublisherWrapper []"; + } + + } + + /** + * DmaapClient library wrapper + */ + public abstract class DmaapPublisherWrapper implements BusPublisher { + + private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class); + + /** + * MR based Publisher + */ + protected MRSimplerBatchPublisher publisher; + protected Properties props; + + /** + * MR Publisher Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param username AAF or DME2 Login + * @param password AAF or DME2 Password + */ + public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List servers, String topic, + String username, String password, boolean useHttps) { + + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("No topic for DMaaP"); + } + + + if (protocol == ProtocolTypeConstants.AAF_AUTH) { + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided"); + } + + ArrayList dmaapServers = new ArrayList<>(); + if (useHttps) { + for (String server : servers) { + dmaapServers.add(server + ":3905"); + } + + } else { + for (String server : servers) { + dmaapServers.add(server + ":3904"); + } + } + + + this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); + + this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + } else if (protocol == ProtocolTypeConstants.DME2) { + ArrayList dmaapServers = new ArrayList<>(); + dmaapServers.add("0.0.0.0:3904"); + + this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); + + this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); + } + + this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); + + this.publisher.setUsername(username); + this.publisher.setPassword(password); + + props = new Properties(); + + if (useHttps) { + props.setProperty("Protocol", "https"); + } else { + props.setProperty("Protocol", "http"); + } + + props.setProperty("contenttype", "application/json"); + props.setProperty("username", username); + props.setProperty("password", password); + + props.setProperty("topic", topic); + + this.publisher.setProps(props); + + if (protocol == ProtocolTypeConstants.AAF_AUTH) { + this.publisher.setHost(servers.get(0)); + } + + logger.info("{}: CREATION: using protocol {}", this, protocol.getValue()); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + logger.info("{}: CLOSE", this); + + try { + this.publisher.close(1, TimeUnit.SECONDS); + } catch (Exception e) { + logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean send(String partitionId, String message) { + if (message == null) { + throw new IllegalArgumentException("No message provided"); + } + + this.publisher.setPubResponse(new MRPublisherResponse()); + this.publisher.send(partitionId, message); + MRPublisherResponse response = this.publisher.sendBatchWithResponse(); + if (response != null) { + logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(), + response.getResponseMessage()); + } + + return true; + } + + @Override + public String toString() { + return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate() + + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()=" + + publisher.getHost() + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag() + + ", publisher.getUsername()=" + publisher.getUsername() + "]"; + } + } + + /** + * DmaapClient library wrapper + */ + public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { + /** + * MR based Publisher + */ + public DmaapAafPublisherWrapper(List servers, String topic, String aafLogin, String aafPassword, + boolean useHttps) { + + super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps); + } + } + + public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { + public DmaapDmePublisherWrapper(List servers, String topic, String username, String password, + String environment, String aftEnvironment, String dme2Partner, String latitude, String longitude, + Map additionalProps, boolean useHttps) { + + super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps); + + + + String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + + if (environment == null || environment.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + } + if (aftEnvironment == null || aftEnvironment.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + } + if (latitude == null || latitude.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + } + if (longitude == null || longitude.isEmpty()) { + throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + } + + if ((dme2Partner == null || dme2Partner.isEmpty()) + && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { + throw new IllegalArgumentException( + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + + String serviceName = servers.get(0); + + /* These are required, no defaults */ + props.setProperty("Environment", environment); + props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + + props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); + + if (dme2Partner != null) { + props.setProperty("Partner", dme2Partner); + } + if (dme2RouteOffer != null) { + props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + } + + props.setProperty("Latitude", latitude); + props.setProperty("Longitude", longitude); + + // ServiceName also a default, found in additionalProps + + /* These are optional, will default to these values if not set in optionalProps */ + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); + props.setProperty("Version", "1.0"); + props.setProperty("SubContextPath", "/"); + props.setProperty("sessionstickinessrequired", "no"); + + /* These should not change */ + props.setProperty("TransportType", "DME2"); + props.setProperty("MethodType", "POST"); + + for (Map.Entry entry : additionalProps.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + + if (value != null) { + props.setProperty(key, value); + } + } + + this.publisher.setProps(props); + } + + private IllegalArgumentException parmException(String topic, String propnm) { + return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + propnm + " property for DME2 in DMaaP"); + + } + } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/FilterableBusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/FilterableBusConsumer.java deleted file mode 100644 index 76adf980..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/FilterableBusConsumer.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -/** - * BusConsumer that supports server-side filtering. - */ -public interface FilterableBusConsumer extends BusConsumer { - - /** - * Sets the server-side filter. - * - * @param filter new filter value, or {@code null} - * @throws IllegalArgumentException if the consumer cannot be built with the new filter - */ - public void setFilter(String filter); -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java index c630a165..3ea7185e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -25,8 +25,6 @@ import java.util.Map; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaPublisherWrapper; -import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.DmaapDmePublisherWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,11 +101,11 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop @Override public void init() { if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.publisher = new CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, - this.userName, this.password, this.useHttps, this.allowSelfSignedCerts); + this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, + this.apiSecret, this.userName, this.password, this.useHttps, this.allowSelfSignedCerts); } else { - this.publisher = new DmaapDmePublisherWrapper(this.servers, this.topic, this.userName, this.password, - this.environment, this.aftEnvironment, this.partner, this.latitude, this.longitude, + this.publisher = new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, this.userName, + this.password, this.environment, this.aftEnvironment, this.partner, this.latitude, this.longitude, this.additionalProps, this.useHttps); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java index 34dd4e74..fefe6493 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java @@ -24,7 +24,6 @@ import java.util.List; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaPublisherWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,8 +62,7 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi @Override public void init() { - this.publisher = - new CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, + this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, null, null, this.useHttps, this.allowSelfSignedCerts); logger.info("{}: UEB SINK created", this); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index d59c63f5..97ebbd50 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer; import org.onap.policy.common.utils.network.NetworkUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index de5fed6a..8ac41424 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -26,8 +26,6 @@ import java.util.Map; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaConsumerWrapper; -import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.DmaapDmeConsumerWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,15 +138,15 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource @Override public void init() throws MalformedURLException { if (anyNullOrEmpty(this.userName, this.password)) { - this.consumer = new CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, - this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.useHttps, - this.allowSelfSignedCerts); + this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, + this.apiSecret, this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, + this.useHttps, this.allowSelfSignedCerts); } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.consumer = new CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, - this.userName, this.password, this.consumerGroup, this.consumerInstance, this.fetchTimeout, - this.fetchLimit, this.useHttps, this.allowSelfSignedCerts); + this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, + this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts); } else { - this.consumer = new DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey, + this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.environment, this.aftEnvironment, this.partner, this.latitude, this.longitude, this.additionalProps, this.useHttps); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java index 00e45422..b7a20503 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java @@ -24,7 +24,6 @@ import java.util.List; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaConsumerWrapper; /** * This topic source implementation specializes in reading messages over an UEB Bus topic source and @@ -66,7 +65,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i */ @Override public void init() { - this.consumer = new CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, + this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaConsumerWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaConsumerWrapper.java deleted file mode 100644 index fedde284..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaConsumerWrapper.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; - -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.List; - -import org.onap.policy.common.endpoints.event.comm.bus.internal.FilterableBusConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Cambria based consumer - */ -public class CambriaConsumerWrapper implements FilterableBusConsumer { - - /** - * logger - */ - private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); - - /** - * Used to build the consumer. - */ - private final ConsumerBuilder builder; - - /** - * Locked while updating {@link #consumer} and {@link #newConsumer}. - */ - private final Object consLocker = new Object(); - - /** - * Cambria client - */ - private CambriaConsumer consumer; - - /** - * Cambria client to use for next fetch - */ - private CambriaConsumer newConsumer = null; - - /** - * fetch timeout - */ - protected int fetchTimeout; - - /** - * close condition - */ - protected Object closeCondition = new Object(); - - /** - * Cambria Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws GeneralSecurityException - * @throws MalformedURLException - */ - public CambriaConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, - boolean useSelfSignedCerts) { - this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, - useHttps, useSelfSignedCerts); - } - - public CambriaConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, String username, - String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, - boolean useHttps, boolean useSelfSignedCerts) { - - this.fetchTimeout = fetchTimeout; - - this.builder = new CambriaClientBuilders.ConsumerBuilder(); - - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic).waitAtServer(fetchTimeout) - .receivingAtMost(fetchLimit); - - // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(fetchTimeout + 30000); - - if (useHttps) { - builder.usingHttps(); - - if (useSelfSignedCerts) { - builder.allowSelfSignedCertificates(); - } - } - - if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); - } - - if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { - builder.authenticatedByHttp(username, password); - } - - try { - this.consumer = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public Iterable fetch() throws IOException, InterruptedException { - try { - return getCurrentConsumer().fetch(); - } catch (final IOException e) { - logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), - this.fetchTimeout); - synchronized (this.closeCondition) { - this.closeCondition.wait(this.fetchTimeout); - } - - throw e; - } - } - - @Override - public void close() { - synchronized (closeCondition) { - closeCondition.notifyAll(); - } - - getCurrentConsumer().close(); - } - - private CambriaConsumer getCurrentConsumer() { - CambriaConsumer old = null; - CambriaConsumer ret; - - synchronized (consLocker) { - if (this.newConsumer != null) { - // replace old consumer with new consumer - old = this.consumer; - this.consumer = this.newConsumer; - this.newConsumer = null; - } - - ret = this.consumer; - } - - if (old != null) { - old.close(); - } - - return ret; - } - - @Override - public void setFilter(String filter) { - logger.info("{}: setting DMAAP server-side filter: {}", this, filter); - builder.withServerSideFilter(filter); - - try { - CambriaConsumer previous; - synchronized (consLocker) { - previous = this.newConsumer; - this.newConsumer = builder.build(); - } - - if (previous != null) { - // there was already a new consumer - close it - previous.close(); - } - - } catch (MalformedURLException | GeneralSecurityException e) { - /* - * Since an exception occurred, "consumer" still has its old value, thus it should not - * be closed at this point. - */ - throw new IllegalArgumentException(e); - } - } - - @Override - public String toString() { - return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaPublisherWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaPublisherWrapper.java deleted file mode 100644 index 7902d4be..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/CambriaPublisherWrapper.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; - -import com.att.nsa.apiClient.http.HttpClient.ConnectionType; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; -import com.fasterxml.jackson.annotation.JsonIgnore; - -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.List; - -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Cambria based library publisher - */ -public class CambriaPublisherWrapper implements BusPublisher { - - private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class); - - /** - * The actual Cambria publisher - */ - @JsonIgnore - protected volatile CambriaBatchingPublisher publisher; - - public CambriaPublisherWrapper(List servers, String topic, String apiKey, String apiSecret, - boolean useHttps) { - this(servers, topic, apiKey, apiSecret, null, null, useHttps, false); - } - - public CambriaPublisherWrapper(List servers, String topic, String apiKey, String apiSecret, String username, - String password, boolean useHttps, boolean selfSignedCerts) { - - PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); - - builder.usingHosts(servers).onTopic(topic); - - // Set read timeout to 30 seconds (TBD: this should be configurable) - builder.withSocketTimeout(30000); - - if (useHttps) { - if (selfSignedCerts) { - builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION); - } else { - builder.withConnectionType(ConnectionType.HTTPS); - } - } - - - if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); - } - - if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { - builder.authenticatedByHttp(username, password); - } - - try { - this.publisher = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - /** - * {@inheritDoc} - */ - @Override - public boolean send(String partitionId, String message) { - if (message == null) { - throw new IllegalArgumentException("No message provided"); - } - - try { - this.publisher.send(partitionId, message); - } catch (Exception e) { - logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e); - return false; - } - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public void close() { - logger.info("{}: CLOSE", this); - - try { - this.publisher.close(); - } catch (Exception e) { - logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); - } - } - - - @Override - public String toString() { - return "CambriaPublisherWrapper []"; - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafConsumerWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafConsumerWrapper.java deleted file mode 100644 index 29fbf1d5..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafConsumerWrapper.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; - -import com.att.nsa.mr.client.impl.MRConsumerImpl; -import com.att.nsa.mr.test.clients.ProtocolTypeConstants; - -import java.net.MalformedURLException; -import java.util.List; -import java.util.Properties; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * MR based consumer - */ -public class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); - - private final Properties props; - - /** - * MR Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param aafLogin AAF Login - * @param aafPassword AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws MalformedURLException - */ - public DmaapAafConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, String aafLogin, - String aafPassword, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, - boolean useHttps) throws MalformedURLException { - - super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, consumerInstance, fetchTimeout, - fetchLimit); - - // super constructor sets servers = {""} if empty to avoid errors when using DME2 - if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) || (servers.isEmpty())) { - throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); - } - - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - props = new Properties(); - - if (useHttps) { - props.setProperty(PROTOCOL_PROP, "https"); - this.consumer.setHost(servers.get(0) + ":3905"); - - } else { - props.setProperty(PROTOCOL_PROP, "http"); - this.consumer.setHost(servers.get(0) + ":3904"); - } - - this.consumer.setProps(props); - logger.info("{}: CREATION", this); - } - - @Override - public String toString() { - final MRConsumerImpl consumer = this.consumer; - - return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafPublisherWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafPublisherWrapper.java deleted file mode 100644 index 1308bb36..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapAafPublisherWrapper.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; - -import com.att.nsa.mr.test.clients.ProtocolTypeConstants; - -import java.util.List; - -/** - * DmaapClient library wrapper - */ -public class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { - /** - * MR based Publisher - */ - public DmaapAafPublisherWrapper(List servers, String topic, String aafLogin, String aafPassword, - boolean useHttps) { - - super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps); - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapConsumerWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapConsumerWrapper.java deleted file mode 100644 index 0806c3d3..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapConsumerWrapper.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; - -import com.att.nsa.mr.client.impl.MRConsumerImpl; -import com.att.nsa.mr.client.response.MRConsumerResponse; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.util.ArrayList; -import java.util.List; - -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * MR based consumer - */ -public abstract class DmaapConsumerWrapper implements BusConsumer { - - /** - * logger - */ - private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); - - /** - * Name of the "protocol" property. - */ - protected static final String PROTOCOL_PROP = "Protocol"; - - /** - * fetch timeout - */ - protected int fetchTimeout; - - /** - * close condition - */ - protected Object closeCondition = new Object(); - - /** - * MR Consumer - */ - protected MRConsumerImpl consumer; - - /** - * MR Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param username AAF Login - * @param password AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws MalformedURLException - */ - public DmaapConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, String username, - String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit) - throws MalformedURLException { - - this.fetchTimeout = fetchTimeout; - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("No topic for DMaaP"); - } - - this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, - null, apiKey, apiSecret); - - this.consumer.setUsername(username); - this.consumer.setPassword(password); - } - - @Override - public Iterable fetch() throws InterruptedException, IOException { - final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); - if (response == null) { - logger.warn("{}: DMaaP NULL response received", this); - - synchronized (closeCondition) { - closeCondition.wait(fetchTimeout); - } - return new ArrayList<>(); - } else { - logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(), response.getResponseMessage()); - - if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) { - - logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), - response.getResponseMessage()); - - synchronized (closeCondition) { - closeCondition.wait(fetchTimeout); - } - - /* fall through */ - } - } - - if (response.getActualMessages() == null) { - return new ArrayList<>(); - } else { - return response.getActualMessages(); - } - } - - @Override - public void close() { - synchronized (closeCondition) { - closeCondition.notifyAll(); - } - - this.consumer.close(); - } - - @Override - public String toString() { - return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() - + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost() - + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()=" - + consumer.getUsername() + "]"; - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmeConsumerWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmeConsumerWrapper.java deleted file mode 100644 index fd998e2b..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmeConsumerWrapper.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; - -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.test.clients.ProtocolTypeConstants; - -import java.net.MalformedURLException; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); - - private final Properties props; - - public DmaapDmeConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, - String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude, - String longitude, Map additionalProps, boolean useHttps) throws MalformedURLException { - - - - super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance, fetchTimeout, - fetchLimit); - - - final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - - if (environment == null || environment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (aftEnvironment == null || aftEnvironment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (latitude == null || latitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (longitude == null || longitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } - - if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { - throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } - - final String serviceName = servers.get(0); - - this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - - this.consumer.setUsername(dme2Login); - this.consumer.setPassword(dme2Password); - - props = new Properties(); - - props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); - - props.setProperty("username", dme2Login); - props.setProperty("password", dme2Password); - - /* These are required, no defaults */ - props.setProperty("topic", topic); - - props.setProperty("Environment", environment); - props.setProperty("AFT_ENVIRONMENT", aftEnvironment); - - if (dme2Partner != null) { - props.setProperty("Partner", dme2Partner); - } - if (dme2RouteOffer != null) { - props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - - props.setProperty("Latitude", latitude); - props.setProperty("Longitude", longitude); - - /* These are optional, will default to these values if not set in additionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - - /* These should not change */ - props.setProperty("TransportType", "DME2"); - props.setProperty("MethodType", "GET"); - - if (useHttps) { - props.setProperty(PROTOCOL_PROP, "https"); - - } else { - props.setProperty(PROTOCOL_PROP, "http"); - } - - props.setProperty("contenttype", "application/json"); - - if (additionalProps != null) { - for (Map.Entry entry : additionalProps.entrySet()) { - props.put(entry.getKey(), entry.getValue()); - } - } - - MRClientFactory.prop = props; - this.consumer.setProps(props); - - logger.info("{}: CREATION", this); - } - - private IllegalArgumentException parmException(String topic, String propnm) { - return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + propnm + " property for DME2 in DMaaP"); - - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmePublisherWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmePublisherWrapper.java deleted file mode 100644 index e39954ed..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapDmePublisherWrapper.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; - -import com.att.nsa.mr.test.clients.ProtocolTypeConstants; - -import java.util.List; -import java.util.Map; - -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; - -public class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { - public DmaapDmePublisherWrapper(List servers, String topic, String username, String password, - String environment, String aftEnvironment, String dme2Partner, String latitude, String longitude, - Map additionalProps, boolean useHttps) { - - super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps); - - - - String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - - if (environment == null || environment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (aftEnvironment == null || aftEnvironment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (latitude == null || latitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (longitude == null || longitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } - - if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { - throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } - - String serviceName = servers.get(0); - - /* These are required, no defaults */ - props.setProperty("Environment", environment); - props.setProperty("AFT_ENVIRONMENT", aftEnvironment); - - props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); - - if (dme2Partner != null) { - props.setProperty("Partner", dme2Partner); - } - if (dme2RouteOffer != null) { - props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - - props.setProperty("Latitude", latitude); - props.setProperty("Longitude", longitude); - - // ServiceName also a default, found in additionalProps - - /* These are optional, will default to these values if not set in optionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - - /* These should not change */ - props.setProperty("TransportType", "DME2"); - props.setProperty("MethodType", "POST"); - - for (Map.Entry entry : additionalProps.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - - if (value != null) { - props.setProperty(key, value); - } - } - - this.publisher.setProps(props); - } - - private IllegalArgumentException parmException(String topic, String propnm) { - return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + propnm + " property for DME2 in DMaaP"); - - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapPublisherWrapper.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapPublisherWrapper.java deleted file mode 100644 index 396580e9..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/impl/DmaapPublisherWrapper.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal.impl; - -import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; -import com.att.nsa.mr.client.response.MRPublisherResponse; -import com.att.nsa.mr.test.clients.ProtocolTypeConstants; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * DmaapClient library wrapper - */ -public abstract class DmaapPublisherWrapper implements BusPublisher { - - private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class); - - /** - * MR based Publisher - */ - protected MRSimplerBatchPublisher publisher; - protected Properties props; - - /** - * MR Publisher Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param username AAF or DME2 Login - * @param password AAF or DME2 Password - */ - public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List servers, String topic, String username, - String password, boolean useHttps) { - - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("No topic for DMaaP"); - } - - - if (protocol == ProtocolTypeConstants.AAF_AUTH) { - if (servers == null || servers.isEmpty()) { - throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided"); - } - - ArrayList dmaapServers = new ArrayList<>(); - if (useHttps) { - for (String server : servers) { - dmaapServers.add(server + ":3905"); - } - - } else { - for (String server : servers) { - dmaapServers.add(server + ":3904"); - } - } - - - this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); - - this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - } else if (protocol == ProtocolTypeConstants.DME2) { - ArrayList dmaapServers = new ArrayList<>(); - dmaapServers.add("0.0.0.0:3904"); - - this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); - - this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - } - - this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); - - this.publisher.setUsername(username); - this.publisher.setPassword(password); - - props = new Properties(); - - if (useHttps) { - props.setProperty("Protocol", "https"); - } else { - props.setProperty("Protocol", "http"); - } - - props.setProperty("contenttype", "application/json"); - props.setProperty("username", username); - props.setProperty("password", password); - - props.setProperty("topic", topic); - - this.publisher.setProps(props); - - if (protocol == ProtocolTypeConstants.AAF_AUTH) { - this.publisher.setHost(servers.get(0)); - } - - logger.info("{}: CREATION: using protocol {}", this, protocol.getValue()); - } - - /** - * {@inheritDoc} - */ - @Override - public void close() { - logger.info("{}: CLOSE", this); - - try { - this.publisher.close(1, TimeUnit.SECONDS); - } catch (Exception e) { - logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); - } - } - - /** - * {@inheritDoc} - */ - @Override - public boolean send(String partitionId, String message) { - if (message == null) { - throw new IllegalArgumentException("No message provided"); - } - - this.publisher.setPubResponse(new MRPublisherResponse()); - this.publisher.send(partitionId, message); - MRPublisherResponse response = this.publisher.sendBatchWithResponse(); - if (response != null) { - logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(), response.getResponseMessage()); - } - - return true; - } - - @Override - public String toString() { - return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate() - + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()=" + publisher.getHost() - + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag() + ", publisher.getUsername()=" - + publisher.getUsername() + "]"; - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/impl/ProxyTopicEndpointManager.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/impl/ProxyTopicEndpointManager.java deleted file mode 100644 index 779ab74d..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/impl/ProxyTopicEndpointManager.java +++ /dev/null @@ -1,482 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.impl; - -import com.fasterxml.jackson.annotation.JsonIgnore; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -import org.onap.policy.common.capabilities.Startable; -import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.TopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; -import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedDmaapTopicSinkFactory; -import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedDmaapTopicSourceFactory; -import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedNoopTopicSinkFactory; -import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedUebTopicSinkFactory; -import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedUebTopicSourceFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This implementation of the Topic Endpoint Manager, proxies operations to appropriate - * implementations according to the communication infrastructure that are supported - */ -public class ProxyTopicEndpointManager implements TopicEndpoint { - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); - /** - * Is this element locked? - */ - protected volatile boolean locked = false; - - /** - * Is this element alive? - */ - protected volatile boolean alive = false; - - /** - * singleton for global access - */ - private static final TopicEndpoint manager = new ProxyTopicEndpointManager(); - - /** - * Get the singelton instance. - * - * @return the instance - */ - public static TopicEndpoint getInstance() { - return manager; - } - - @Override - public List addTopicSources(Properties properties) { - - // 1. Create UEB Sources - // 2. Create DMAAP Sources - - final List sources = new ArrayList<>(); - - sources.addAll(IndexedUebTopicSourceFactory.getInstance().build(properties)); - sources.addAll(IndexedDmaapTopicSourceFactory.getInstance().build(properties)); - - if (this.isLocked()) { - for (final TopicSource source : sources) { - source.lock(); - } - } - - return sources; - } - - @Override - public List addTopicSinks(Properties properties) { - // 1. Create UEB Sinks - // 2. Create DMAAP Sinks - - final List sinks = new ArrayList<>(); - - sinks.addAll(IndexedUebTopicSinkFactory.getInstance().build(properties)); - sinks.addAll(IndexedDmaapTopicSinkFactory.getInstance().build(properties)); - sinks.addAll(IndexedNoopTopicSinkFactory.getInstance().build(properties)); - - if (this.isLocked()) { - for (final TopicSink sink : sinks) { - sink.lock(); - } - } - - return sinks; - } - - @Override - public List getTopicSources() { - - final List sources = new ArrayList<>(); - - sources.addAll(IndexedUebTopicSourceFactory.getInstance().inventory()); - sources.addAll(IndexedDmaapTopicSourceFactory.getInstance().inventory()); - - return sources; - } - - @Override - public List getTopicSinks() { - - final List sinks = new ArrayList<>(); - - sinks.addAll(IndexedUebTopicSinkFactory.getInstance().inventory()); - sinks.addAll(IndexedDmaapTopicSinkFactory.getInstance().inventory()); - sinks.addAll(IndexedNoopTopicSinkFactory.getInstance().inventory()); - - return sinks; - } - - @JsonIgnore - @Override - public List getUebTopicSources() { - return IndexedUebTopicSourceFactory.getInstance().inventory(); - } - - @JsonIgnore - @Override - public List getDmaapTopicSources() { - return IndexedDmaapTopicSourceFactory.getInstance().inventory(); - } - - @JsonIgnore - @Override - public List getUebTopicSinks() { - return IndexedUebTopicSinkFactory.getInstance().inventory(); - } - - @JsonIgnore - @Override - public List getDmaapTopicSinks() { - return IndexedDmaapTopicSinkFactory.getInstance().inventory(); - } - - @JsonIgnore - @Override - public List getNoopTopicSinks() { - return IndexedNoopTopicSinkFactory.getInstance().inventory(); - } - - @Override - public boolean start() { - - synchronized (this) { - if (this.locked) { - throw new IllegalStateException(this + " is locked"); - } - - if (this.alive) { - return true; - } - - this.alive = true; - } - - final List endpoints = this.getEndpoints(); - - boolean success = true; - for (final Startable endpoint : endpoints) { - try { - success = endpoint.start() && success; - } catch (final Exception e) { - success = false; - logger.error("Problem starting endpoint: {}", endpoint, e); - } - } - - return success; - } - - - @Override - public boolean stop() { - - /* - * stop regardless if it is locked, in other words, stop operation has precedence over - * locks. - */ - synchronized (this) { - this.alive = false; - } - - final List endpoints = this.getEndpoints(); - - boolean success = true; - for (final Startable endpoint : endpoints) { - try { - success = endpoint.stop() && success; - } catch (final Exception e) { - success = false; - logger.error("Problem stopping endpoint: {}", endpoint, e); - } - } - - return success; - } - - /** - * - * @return list of managed endpoints - */ - @JsonIgnore - protected List getEndpoints() { - final List endpoints = new ArrayList<>(); - - endpoints.addAll(this.getTopicSources()); - endpoints.addAll(this.getTopicSinks()); - - return endpoints; - } - - @Override - public void shutdown() { - IndexedUebTopicSourceFactory.getInstance().destroy(); - IndexedUebTopicSinkFactory.getInstance().destroy(); - IndexedNoopTopicSinkFactory.getInstance().destroy(); - - IndexedDmaapTopicSourceFactory.getInstance().destroy(); - IndexedDmaapTopicSinkFactory.getInstance().destroy(); - } - - @Override - public boolean isAlive() { - return this.alive; - } - - @Override - public boolean lock() { - - synchronized (this) { - if (this.locked) { - return true; - } - - this.locked = true; - } - - for (final TopicSource source : this.getTopicSources()) { - source.lock(); - } - - for (final TopicSink sink : this.getTopicSinks()) { - sink.lock(); - } - - return true; - } - - @Override - public boolean unlock() { - synchronized (this) { - if (!this.locked) { - return true; - } - - this.locked = false; - } - - for (final TopicSource source : this.getTopicSources()) { - source.unlock(); - } - - for (final TopicSink sink : this.getTopicSinks()) { - sink.unlock(); - } - - return true; - } - - @Override - public boolean isLocked() { - return this.locked; - } - - @Override - public List getTopicSources(List topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List sources = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSource uebSource = this.getUebTopicSource(topic); - if (uebSource != null) { - sources.add(uebSource); - } - } catch (final Exception e) { - logger.debug("No UEB source for topic: {}", topic, e); - } - - try { - final TopicSource dmaapSource = this.getDmaapTopicSource(topic); - if (dmaapSource != null) { - sources.add(dmaapSource); - } - } catch (final Exception e) { - logger.debug("No DMAAP source for topic: {}", topic, e); - } - } - return sources; - } - - @Override - public List getTopicSinks(List topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - final List sinks = new ArrayList<>(); - for (final String topic : topicNames) { - try { - final TopicSink uebSink = this.getUebTopicSink(topic); - if (uebSink != null) { - sinks.add(uebSink); - } - } catch (final Exception e) { - logger.debug("No UEB sink for topic: {}", topic, e); - } - - try { - final TopicSink dmaapSink = this.getDmaapTopicSink(topic); - if (dmaapSink != null) { - sinks.add(dmaapSink); - } - } catch (final Exception e) { - logger.debug("No DMAAP sink for topic: {}", topic, e); - } - - try { - final TopicSink noopSink = this.getNoopTopicSink(topic); - if (noopSink != null) { - sinks.add(noopSink); - } - } catch (final Exception e) { - logger.debug("No NOOP sink for topic: {}", topic, e); - } - } - return sinks; - } - - @Override - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) { - - if (commType == null) { - throw parmException(topicName); - } - - if (topicName == null) { - throw parmException(topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSource(topicName); - case DMAAP: - return this.getDmaapTopicSource(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - private IllegalArgumentException parmException(String topicName) { - return new IllegalArgumentException( - "Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - @Override - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) { - if (commType == null) { - throw parmException(topicName); - } - - if (topicName == null) { - throw parmException(topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSink(topicName); - case DMAAP: - return this.getDmaapTopicSink(topicName); - case NOOP: - return this.getNoopTopicSink(topicName); - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - @Override - public List getTopicSinks(String topicName) { - if (topicName == null) { - throw parmException(topicName); - } - - final List sinks = new ArrayList<>(); - - try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getDmaapTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - try { - sinks.add(this.getNoopTopicSink(topicName)); - } catch (final Exception e) { - logNoSink(topicName, e); - } - - return sinks; - } - - private void logNoSink(String topicName, Exception ex) { - logger.debug("No sink for topic: {}", topicName, ex); - } - - @Override - public UebTopicSource getUebTopicSource(String topicName) { - return IndexedUebTopicSourceFactory.getInstance().get(topicName); - } - - @Override - public UebTopicSink getUebTopicSink(String topicName) { - return IndexedUebTopicSinkFactory.getInstance().get(topicName); - } - - @Override - public DmaapTopicSource getDmaapTopicSource(String topicName) { - return IndexedDmaapTopicSourceFactory.getInstance().get(topicName); - } - - @Override - public DmaapTopicSink getDmaapTopicSink(String topicName) { - return IndexedDmaapTopicSinkFactory.getInstance().get(topicName); - } - - @Override - public NoopTopicSink getNoopTopicSink(String topicName) { - return IndexedNoopTopicSinkFactory.getInstance().get(topicName); - } - -} -- cgit 1.2.3-korg