summaryrefslogtreecommitdiffstats
path: root/policy-endpoints
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java1207
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java365
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java827
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java339
4 files changed, 1374 insertions, 1364 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java
index cc3705ee..09ee9a4e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -29,214 +29,217 @@ import org.onap.policy.drools.event.comm.bus.DmaapTopicSource;
import org.onap.policy.drools.event.comm.bus.NoopTopicSink;
import org.onap.policy.drools.event.comm.bus.UebTopicSink;
import org.onap.policy.drools.event.comm.bus.UebTopicSource;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
import org.onap.policy.drools.properties.Lockable;
import org.onap.policy.drools.properties.Startable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
- * Abstraction to managed the system's Networked Topic Endpoints,
- * sources of all events input into the System.
+ * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into
+ * the System.
*/
public interface TopicEndpoint extends Startable, Lockable {
-
- /**
- * Add Topic Sources to the communication infrastructure initialized per
- * properties
- *
- * @param properties properties for Topic Source construction
- * @return a generic Topic Source
- * @throws IllegalArgumentException when invalid arguments are provided
- */
- public List<TopicSource> addTopicSources(Properties properties);
-
- /**
- * Add Topic Sinks to the communication infrastructure initialized per
- * properties
- *
- * @param properties properties for Topic Sink construction
- * @return a generic Topic Sink
- * @throws IllegalArgumentException when invalid arguments are provided
- */
- public List<TopicSink> addTopicSinks(Properties properties);
-
- /**
- * gets all Topic Sources
- * @return the Topic Source List
- */
- List<TopicSource> getTopicSources();
-
- /**
- * get the Topic Sources for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the Topic Source List
- * @throws IllegalStateException if the entity is in an invalid state
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public List<TopicSource> getTopicSources(List<String> topicNames);
-
- /**
- * gets the Topic Source for the given topic name and
- * underlying communication infrastructure type
- *
- * @param commType communication infrastructure type
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- * @throws UnsupportedOperationException if the operation is not supported.
- */
- public TopicSource getTopicSource(Topic.CommInfrastructure commType,
- String topicName)
- throws UnsupportedOperationException;
-
- /**
- * get the UEB Topic Source for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the UEB Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public UebTopicSource getUebTopicSource(String topicName);
-
- /**
- * get the DMAAP Topic Source for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the DMAAP Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSource getDmaapTopicSource(String topicName);
-
- /**
- * get the Topic Sinks for the given topic name
- *
- * @param topicNames the topic names
- * @return the Topic Sink List
- * @throws IllegalStateException
- * @throws IllegalArgumentException
- */
- public List<TopicSink> getTopicSinks(List<String> topicNames);
-
- /**
- * get the Topic Sinks for the given topic name and
- * underlying communication infrastructure type
- *
- * @param topicName the topic name
- * @param commType communication infrastructure type
- *
- * @return the Topic Sink List
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicWriters for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public TopicSink getTopicSink(Topic.CommInfrastructure commType,
- String topicName)
- throws UnsupportedOperationException;
-
- /**
- * get the Topic Sinks for the given topic name and
- * all the underlying communication infrastructure type
- *
- * @param topicName the topic name
- * @param commType communication infrastructure type
- *
- * @return the Topic Sink List
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicWriters for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public List<TopicSink> getTopicSinks(String topicName);
-
- /**
- * get the UEB Topic Source for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public UebTopicSink getUebTopicSink(String topicName);
-
- /**
- * get the no-op Topic Sink for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public NoopTopicSink getNoopTopicSink(String topicName);
-
- /**
- * get the DMAAP Topic Source for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSink getDmaapTopicSink(String topicName);
-
- /**
- * gets only the UEB Topic Sources
- * @return the UEB Topic Source List
- */
- public List<UebTopicSource> getUebTopicSources();
-
- /**
- * gets only the DMAAP Topic Sources
- * @return the DMAAP Topic Source List
- */
- public List<DmaapTopicSource> getDmaapTopicSources();
-
- /**
- * gets all Topic Sinks
- * @return the Topic Sink List
- */
- public List<TopicSink> getTopicSinks();
-
- /**
- * gets only the UEB Topic Sinks
- * @return the UEB Topic Sink List
- */
- public List<UebTopicSink> getUebTopicSinks();
-
- /**
- * gets only the DMAAP Topic Sinks
- * @return the DMAAP Topic Sink List
- */
- public List<DmaapTopicSink> getDmaapTopicSinks();
-
- /**
- * gets only the NOOP Topic Sinks
- * @return the NOOP Topic Sinks List
- */
- public List<NoopTopicSink> getNoopTopicSinks();
-
- /**
- * singleton for global access
- */
- public static final TopicEndpoint manager = new ProxyTopicEndpointManager();
+
+ /**
+ * Add Topic Sources to the communication infrastructure initialized per properties
+ *
+ * @param properties properties for Topic Source construction
+ * @return a generic Topic Source
+ * @throws IllegalArgumentException when invalid arguments are provided
+ */
+ public List<TopicSource> addTopicSources(Properties properties);
+
+ /**
+ * Add Topic Sinks to the communication infrastructure initialized per properties
+ *
+ * @param properties properties for Topic Sink construction
+ * @return a generic Topic Sink
+ * @throws IllegalArgumentException when invalid arguments are provided
+ */
+ public List<TopicSink> addTopicSinks(Properties properties);
+
+ /**
+ * gets all Topic Sources
+ *
+ * @return the Topic Source List
+ */
+ List<TopicSource> getTopicSources();
+
+ /**
+ * get the Topic Sources for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the Topic Source List
+ * @throws IllegalStateException if the entity is in an invalid state
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public List<TopicSource> getTopicSources(List<String> topicNames);
+
+ /**
+ * gets the Topic Source for the given topic name and underlying communication infrastructure type
+ *
+ * @param commType communication infrastructure type
+ * @param topicName the topic name
+ *
+ * @return the Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ * @throws UnsupportedOperationException if the operation is not supported.
+ */
+ public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName)
+ throws UnsupportedOperationException;
+
+ /**
+ * get the UEB Topic Source for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the UEB Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public UebTopicSource getUebTopicSource(String topicName);
+
+ /**
+ * get the DMAAP Topic Source for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the DMAAP Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public DmaapTopicSource getDmaapTopicSource(String topicName);
+
+ /**
+ * get the Topic Sinks for the given topic name
+ *
+ * @param topicNames the topic names
+ * @return the Topic Sink List
+ * @throws IllegalStateException
+ * @throws IllegalArgumentException
+ */
+ public List<TopicSink> getTopicSinks(List<String> topicNames);
+
+ /**
+ * get the Topic Sinks for the given topic name and underlying communication infrastructure type
+ *
+ * @param topicName the topic name
+ * @param commType communication infrastructure type
+ *
+ * @return the Topic Sink List
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicWriters for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName)
+ throws UnsupportedOperationException;
+
+ /**
+ * get the Topic Sinks for the given topic name and all the underlying communication
+ * infrastructure type
+ *
+ * @param topicName the topic name
+ * @param commType communication infrastructure type
+ *
+ * @return the Topic Sink List
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicWriters for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public List<TopicSink> getTopicSinks(String topicName);
+
+ /**
+ * get the UEB Topic Source for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public UebTopicSink getUebTopicSink(String topicName);
+
+ /**
+ * get the no-op Topic Sink for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public NoopTopicSink getNoopTopicSink(String topicName);
+
+ /**
+ * get the DMAAP Topic Source for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public DmaapTopicSink getDmaapTopicSink(String topicName);
+
+ /**
+ * gets only the UEB Topic Sources
+ *
+ * @return the UEB Topic Source List
+ */
+ public List<UebTopicSource> getUebTopicSources();
+
+ /**
+ * gets only the DMAAP Topic Sources
+ *
+ * @return the DMAAP Topic Source List
+ */
+ public List<DmaapTopicSource> getDmaapTopicSources();
+
+ /**
+ * gets all Topic Sinks
+ *
+ * @return the Topic Sink List
+ */
+ public List<TopicSink> getTopicSinks();
+
+ /**
+ * gets only the UEB Topic Sinks
+ *
+ * @return the UEB Topic Sink List
+ */
+ public List<UebTopicSink> getUebTopicSinks();
+
+ /**
+ * gets only the DMAAP Topic Sinks
+ *
+ * @return the DMAAP Topic Sink List
+ */
+ public List<DmaapTopicSink> getDmaapTopicSinks();
+
+ /**
+ * gets only the NOOP Topic Sinks
+ *
+ * @return the NOOP Topic Sinks List
+ */
+ public List<NoopTopicSink> getNoopTopicSinks();
+
+ /**
+ * singleton for global access
+ */
+ public static final TopicEndpoint manager = new ProxyTopicEndpointManager();
}
+
/*
* ----------------- implementation -------------------
*/
@@ -246,398 +249,412 @@ public interface TopicEndpoint extends Startable, Lockable {
* implementations according to the communication infrastructure that are supported
*/
class ProxyTopicEndpointManager implements TopicEndpoint {
- /**
- * Logger
- */
- private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class);
- /**
- * Is this element locked?
- */
- protected volatile boolean locked = false;
-
- /**
- * Is this element alive?
- */
- protected volatile boolean alive = false;
-
- @Override
- public List<TopicSource> addTopicSources(Properties properties) {
-
- // 1. Create UEB Sources
- // 2. Create DMAAP Sources
-
- List<TopicSource> sources = new ArrayList<>();
-
- sources.addAll(UebTopicSource.factory.build(properties));
- sources.addAll(DmaapTopicSource.factory.build(properties));
-
- if (this.isLocked()) {
- for (TopicSource source : sources) {
- source.lock();
- }
- }
-
- return sources;
- }
-
- @Override
- public List<TopicSink> addTopicSinks(Properties properties) {
- // 1. Create UEB Sinks
- // 2. Create DMAAP Sinks
-
- List<TopicSink> sinks = new ArrayList<>();
-
- sinks.addAll(UebTopicSink.factory.build(properties));
- sinks.addAll(DmaapTopicSink.factory.build(properties));
- sinks.addAll(NoopTopicSink.factory.build(properties));
-
- if (this.isLocked()) {
- for (TopicSink sink : sinks) {
- sink.lock();
- }
- }
-
- return sinks;
- }
-
- @Override
- public List<TopicSource> getTopicSources() {
-
- List<TopicSource> sources = new ArrayList<>();
-
- sources.addAll(UebTopicSource.factory.inventory());
- sources.addAll(DmaapTopicSource.factory.inventory());
-
- return sources;
- }
-
- @Override
- public List<TopicSink> getTopicSinks() {
-
- List<TopicSink> sinks = new ArrayList<>();
-
- sinks.addAll(UebTopicSink.factory.inventory());
- sinks.addAll(DmaapTopicSink.factory.inventory());
- sinks.addAll(NoopTopicSink.factory.inventory());
-
- return sinks;
- }
-
- @JsonIgnore
- @Override
- public List<UebTopicSource> getUebTopicSources() {
- return UebTopicSource.factory.inventory();
- }
-
- @JsonIgnore
- @Override
- public List<DmaapTopicSource> getDmaapTopicSources() {
- return DmaapTopicSource.factory.inventory();
- }
-
- @JsonIgnore
- @Override
- public List<UebTopicSink> getUebTopicSinks() {
- return UebTopicSink.factory.inventory();
- }
-
- @JsonIgnore
- @Override
- public List<DmaapTopicSink> getDmaapTopicSinks() {
- return DmaapTopicSink.factory.inventory();
- }
-
- @JsonIgnore
- @Override
- public List<NoopTopicSink> getNoopTopicSinks() {
- return NoopTopicSink.factory.inventory();
- }
-
- @Override
- public boolean start() {
-
- synchronized (this) {
- if (this.locked) {
- throw new IllegalStateException(this + " is locked");
- }
-
- if (this.alive) {
- return true;
- }
-
- this.alive = true;
- }
-
- List<Startable> endpoints = getEndpoints();
-
- boolean success = true;
- for (Startable endpoint: endpoints) {
- try {
- success = endpoint.start() && success;
- } catch (Exception e) {
- success = false;
- logger.error("Problem starting endpoint: {}", endpoint, e);
- }
- }
-
- return success;
- }
-
-
- @Override
- public boolean stop() {
-
- /*
- * stop regardless if it is locked, in other
- * words, stop operation has precedence over
- * locks.
- */
- synchronized (this) {
- this.alive = false;
- }
-
- List<Startable> endpoints = getEndpoints();
-
- boolean success = true;
- for (Startable endpoint: endpoints) {
- try {
- success = endpoint.stop() && success;
- } catch (Exception e) {
- success = false;
- logger.error("Problem stopping endpoint: {}", endpoint, e);
- }
- }
-
- return success;
- }
-
- /**
- *
- * @return list of managed endpoints
- */
- @JsonIgnore
- protected List<Startable> getEndpoints() {
- List<Startable> endpoints = new ArrayList<>();
-
- endpoints.addAll(this.getTopicSources());
- endpoints.addAll(this.getTopicSinks());
-
- return endpoints;
- }
-
- @Override
- public void shutdown() {
- UebTopicSource.factory.destroy();
- UebTopicSink.factory.destroy();
-
- DmaapTopicSource.factory.destroy();
- DmaapTopicSink.factory.destroy();
- }
-
- @Override
- public boolean isAlive() {
- return this.alive;
- }
-
- @Override
- public boolean lock() {
-
- synchronized (this) {
- if (locked)
- return true;
-
- this.locked = true;
- }
-
- for (TopicSource source: this.getTopicSources()) {
- source.lock();
- }
-
- for (TopicSink sink: this.getTopicSinks()) {
- sink.lock();
- }
-
- return true;
- }
-
- @Override
- public boolean unlock() {
- synchronized (this) {
- if (!locked)
- return true;
-
- this.locked = false;
- }
-
- for (TopicSource source: this.getTopicSources()) {
- source.unlock();
- }
-
- for (TopicSink sink: this.getTopicSinks()) {
- sink.unlock();
- }
-
- return true;
- }
-
- @Override
- public boolean isLocked() {
- return this.locked;
- }
-
- @Override
- public List<TopicSource> getTopicSources(List<String> topicNames) {
-
- if (topicNames == null) {
- throw new IllegalArgumentException("must provide a list of topics");
- }
-
- List<TopicSource> sources = new ArrayList<>();
- for (String topic: topicNames) {
- try {
- TopicSource uebSource = this.getUebTopicSource(topic);
- if (uebSource != null)
- sources.add(uebSource);
- } catch (Exception e) {
- logger.info("No UEB source for topic: {}", topic, e);
- }
-
- try {
- TopicSource dmaapSource = this.getDmaapTopicSource(topic);
- if (dmaapSource != null)
- sources.add(dmaapSource);
- } catch (Exception e) {
- logger.info("No DMAAP source for topic: {}", topic, e);
- }
- }
- return sources;
- }
-
- @Override
- public List<TopicSink> getTopicSinks(List<String> topicNames) {
-
- if (topicNames == null) {
- throw new IllegalArgumentException("must provide a list of topics");
- }
-
- List<TopicSink> sinks = new ArrayList<>();
- for (String topic: topicNames) {
- try {
- TopicSink uebSink = this.getUebTopicSink(topic);
- if (uebSink != null)
- sinks.add(uebSink);
- } catch (Exception e) {
- logger.info("No UEB sink for topic: {}", topic, e);
- }
-
- try {
- TopicSink dmaapSink = this.getDmaapTopicSink(topic);
- if (dmaapSink != null)
- sinks.add(dmaapSink);
- } catch (Exception e) {
- logger.info("No DMAAP sink for topic: {}", topic, e);
- }
- }
- return sinks;
- }
-
- @Override
- public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName)
- throws UnsupportedOperationException {
-
- if (commType == null) {
- throw new IllegalArgumentException
- ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
- }
-
- if (topicName == null) {
- throw new IllegalArgumentException
- ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
- }
-
- switch (commType) {
- case UEB:
- return this.getUebTopicSource(topicName);
- case DMAAP:
- return this.getDmaapTopicSource(topicName);
- case REST:
- default:
- throw new UnsupportedOperationException("Unsupported " + commType.name());
- }
- }
-
- @Override
- public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName)
- throws UnsupportedOperationException {
- if (commType == null) {
- throw new IllegalArgumentException
- ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
- }
-
- if (topicName == null) {
- throw new IllegalArgumentException
- ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
- }
-
- switch (commType) {
- case UEB:
- return this.getUebTopicSink(topicName);
- case DMAAP:
- return this.getDmaapTopicSink(topicName);
- case REST:
- default:
- throw new UnsupportedOperationException("Unsupported " + commType.name());
- }
- }
-
- @Override
- public List<TopicSink> getTopicSinks(String topicName) {
-
- if (topicName == null) {
- throw new IllegalArgumentException
- ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
- }
-
- List<TopicSink> sinks = new ArrayList<>();
-
- try {
- sinks.add(this.getUebTopicSink(topicName));
- } catch (Exception e) {
- logger.debug("No sink for topic: {}", topicName, e);
- }
-
- try {
- sinks.add(this.getDmaapTopicSink(topicName));
- } catch (Exception e) {
- logger.debug("No sink for topic: {}", topicName, e);
- }
-
- return sinks;
- }
-
- @Override
- public UebTopicSource getUebTopicSource(String topicName) {
- return UebTopicSource.factory.get(topicName);
- }
-
- @Override
- public UebTopicSink getUebTopicSink(String topicName) {
- return UebTopicSink.factory.get(topicName);
- }
-
- @Override
- public DmaapTopicSource getDmaapTopicSource(String topicName) {
- return DmaapTopicSource.factory.get(topicName);
- }
-
- @Override
- public DmaapTopicSink getDmaapTopicSink(String topicName) {
- return DmaapTopicSink.factory.get(topicName);
- }
-
- @Override
- public NoopTopicSink getNoopTopicSink(String topicName) {
- return NoopTopicSink.factory.get(topicName);
- }
-
+ /**
+ * Logger
+ */
+ private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class);
+ /**
+ * Is this element locked?
+ */
+ protected volatile boolean locked = false;
+
+ /**
+ * Is this element alive?
+ */
+ protected volatile boolean alive = false;
+
+ @Override
+ public List<TopicSource> addTopicSources(Properties properties) {
+
+ // 1. Create UEB Sources
+ // 2. Create DMAAP Sources
+
+ final List<TopicSource> sources = new ArrayList<>();
+
+ sources.addAll(UebTopicSource.factory.build(properties));
+ sources.addAll(DmaapTopicSource.factory.build(properties));
+
+ if (this.isLocked()) {
+ for (final TopicSource source : sources) {
+ source.lock();
+ }
+ }
+
+ return sources;
+ }
+
+ @Override
+ public List<TopicSink> addTopicSinks(Properties properties) {
+ // 1. Create UEB Sinks
+ // 2. Create DMAAP Sinks
+
+ final List<TopicSink> sinks = new ArrayList<>();
+
+ sinks.addAll(UebTopicSink.factory.build(properties));
+ sinks.addAll(DmaapTopicSink.factory.build(properties));
+ sinks.addAll(NoopTopicSink.factory.build(properties));
+
+ if (this.isLocked()) {
+ for (final TopicSink sink : sinks) {
+ sink.lock();
+ }
+ }
+
+ return sinks;
+ }
+
+ @Override
+ public List<TopicSource> getTopicSources() {
+
+ final List<TopicSource> sources = new ArrayList<>();
+
+ sources.addAll(UebTopicSource.factory.inventory());
+ sources.addAll(DmaapTopicSource.factory.inventory());
+
+ return sources;
+ }
+
+ @Override
+ public List<TopicSink> getTopicSinks() {
+
+ final List<TopicSink> sinks = new ArrayList<>();
+
+ sinks.addAll(UebTopicSink.factory.inventory());
+ sinks.addAll(DmaapTopicSink.factory.inventory());
+ sinks.addAll(NoopTopicSink.factory.inventory());
+
+ return sinks;
+ }
+
+ @JsonIgnore
+ @Override
+ public List<UebTopicSource> getUebTopicSources() {
+ return UebTopicSource.factory.inventory();
+ }
+
+ @JsonIgnore
+ @Override
+ public List<DmaapTopicSource> getDmaapTopicSources() {
+ return DmaapTopicSource.factory.inventory();
+ }
+
+ @JsonIgnore
+ @Override
+ public List<UebTopicSink> getUebTopicSinks() {
+ return UebTopicSink.factory.inventory();
+ }
+
+ @JsonIgnore
+ @Override
+ public List<DmaapTopicSink> getDmaapTopicSinks() {
+ return DmaapTopicSink.factory.inventory();
+ }
+
+ @JsonIgnore
+ @Override
+ public List<NoopTopicSink> getNoopTopicSinks() {
+ return NoopTopicSink.factory.inventory();
+ }
+
+ @Override
+ public boolean start() {
+
+ synchronized (this) {
+ if (this.locked) {
+ throw new IllegalStateException(this + " is locked");
+ }
+
+ if (this.alive) {
+ return true;
+ }
+
+ this.alive = true;
+ }
+
+ final List<Startable> endpoints = this.getEndpoints();
+
+ boolean success = true;
+ for (final Startable endpoint : endpoints) {
+ try {
+ success = endpoint.start() && success;
+ } catch (final Exception e) {
+ success = false;
+ logger.error("Problem starting endpoint: {}", endpoint, e);
+ }
+ }
+
+ return success;
+ }
+
+
+ @Override
+ public boolean stop() {
+
+ /*
+ * stop regardless if it is locked, in other words, stop operation has precedence over locks.
+ */
+ synchronized (this) {
+ this.alive = false;
+ }
+
+ final List<Startable> endpoints = this.getEndpoints();
+
+ boolean success = true;
+ for (final Startable endpoint : endpoints) {
+ try {
+ success = endpoint.stop() && success;
+ } catch (final Exception e) {
+ success = false;
+ logger.error("Problem stopping endpoint: {}", endpoint, e);
+ }
+ }
+
+ return success;
+ }
+
+ /**
+ *
+ * @return list of managed endpoints
+ */
+ @JsonIgnore
+ protected List<Startable> getEndpoints() {
+ final List<Startable> endpoints = new ArrayList<>();
+
+ endpoints.addAll(this.getTopicSources());
+ endpoints.addAll(this.getTopicSinks());
+
+ return endpoints;
+ }
+
+ @Override
+ public void shutdown() {
+ UebTopicSource.factory.destroy();
+ UebTopicSink.factory.destroy();
+ NoopTopicSink.factory.destroy();
+
+ DmaapTopicSource.factory.destroy();
+ DmaapTopicSink.factory.destroy();
+ }
+
+ @Override
+ public boolean isAlive() {
+ return this.alive;
+ }
+
+ @Override
+ public boolean lock() {
+
+ synchronized (this) {
+ if (this.locked)
+ return true;
+
+ this.locked = true;
+ }
+
+ for (final TopicSource source : this.getTopicSources()) {
+ source.lock();
+ }
+
+ for (final TopicSink sink : this.getTopicSinks()) {
+ sink.lock();
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean unlock() {
+ synchronized (this) {
+ if (!this.locked)
+ return true;
+
+ this.locked = false;
+ }
+
+ for (final TopicSource source : this.getTopicSources()) {
+ source.unlock();
+ }
+
+ for (final TopicSink sink : this.getTopicSinks()) {
+ sink.unlock();
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean isLocked() {
+ return this.locked;
+ }
+
+ @Override
+ public List<TopicSource> getTopicSources(List<String> topicNames) {
+
+ if (topicNames == null) {
+ throw new IllegalArgumentException("must provide a list of topics");
+ }
+
+ final List<TopicSource> sources = new ArrayList<>();
+ for (final String topic : topicNames) {
+ try {
+ final TopicSource uebSource = this.getUebTopicSource(topic);
+ if (uebSource != null)
+ sources.add(uebSource);
+ } catch (final Exception e) {
+ logger.debug("No UEB source for topic: {}", topic, e);
+ }
+
+ try {
+ final TopicSource dmaapSource = this.getDmaapTopicSource(topic);
+ if (dmaapSource != null)
+ sources.add(dmaapSource);
+ } catch (final Exception e) {
+ logger.debug("No DMAAP source for topic: {}", topic, e);
+ }
+ }
+ return sources;
+ }
+
+ @Override
+ public List<TopicSink> getTopicSinks(List<String> topicNames) {
+
+ if (topicNames == null) {
+ throw new IllegalArgumentException("must provide a list of topics");
+ }
+
+ final List<TopicSink> sinks = new ArrayList<>();
+ for (final String topic : topicNames) {
+ try {
+ final TopicSink uebSink = this.getUebTopicSink(topic);
+ if (uebSink != null)
+ sinks.add(uebSink);
+ } catch (final Exception e) {
+ logger.debug("No UEB sink for topic: {}", topic, e);
+ }
+
+ try {
+ final TopicSink dmaapSink = this.getDmaapTopicSink(topic);
+ if (dmaapSink != null)
+ sinks.add(dmaapSink);
+ } catch (final Exception e) {
+ logger.debug("No DMAAP sink for topic: {}", topic, e);
+ }
+
+ try {
+ final TopicSink noopSink = this.getNoopTopicSink(topic);
+ if (noopSink != null)
+ sinks.add(noopSink);
+ } catch (final Exception e) {
+ logger.debug("No NOOP sink for topic: {}", topic, e);
+ }
+ }
+ return sinks;
+ }
+
+ @Override
+ public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName)
+ throws UnsupportedOperationException {
+
+ if (commType == null) {
+ throw new IllegalArgumentException(
+ "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ if (topicName == null) {
+ throw new IllegalArgumentException(
+ "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ switch (commType) {
+ case UEB:
+ return this.getUebTopicSource(topicName);
+ case DMAAP:
+ return this.getDmaapTopicSource(topicName);
+ case REST:
+ default:
+ throw new UnsupportedOperationException("Unsupported " + commType.name());
+ }
+ }
+
+ @Override
+ public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName)
+ throws UnsupportedOperationException {
+ if (commType == null) {
+ throw new IllegalArgumentException(
+ "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ if (topicName == null) {
+ throw new IllegalArgumentException(
+ "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ switch (commType) {
+ case UEB:
+ return this.getUebTopicSink(topicName);
+ case DMAAP:
+ return this.getDmaapTopicSink(topicName);
+ case NOOP:
+ return this.getNoopTopicSink(topicName);
+ case REST:
+ default:
+ throw new UnsupportedOperationException("Unsupported " + commType.name());
+ }
+ }
+
+ @Override
+ public List<TopicSink> getTopicSinks(String topicName) {
+ if (topicName == null) {
+ throw new IllegalArgumentException(
+ "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ final List<TopicSink> sinks = new ArrayList<>();
+
+ try {
+ sinks.add(this.getUebTopicSink(topicName));
+ } catch (final Exception e) {
+ logger.debug("No sink for topic: {}", topicName, e);
+ }
+
+ try {
+ sinks.add(this.getDmaapTopicSink(topicName));
+ } catch (final Exception e) {
+ logger.debug("No sink for topic: {}", topicName, e);
+ }
+
+ try {
+ sinks.add(this.getNoopTopicSink(topicName));
+ } catch (final Exception e) {
+ logger.debug("No sink for topic: {}", topicName, e);
+ }
+
+ return sinks;
+ }
+
+ @Override
+ public UebTopicSource getUebTopicSource(String topicName) {
+ return UebTopicSource.factory.get(topicName);
+ }
+
+ @Override
+ public UebTopicSink getUebTopicSink(String topicName) {
+ return UebTopicSink.factory.get(topicName);
+ }
+
+ @Override
+ public DmaapTopicSource getDmaapTopicSource(String topicName) {
+ return DmaapTopicSource.factory.get(topicName);
+ }
+
+ @Override
+ public DmaapTopicSink getDmaapTopicSink(String topicName) {
+ return DmaapTopicSink.factory.get(topicName);
+ }
+
+ @Override
+ public NoopTopicSink getNoopTopicSink(String topicName) {
+ return NoopTopicSink.factory.get(topicName);
+ }
+
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java
index 946e48c0..53315391 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -33,191 +33,194 @@ import org.slf4j.LoggerFactory;
* Noop Topic Sink Factory
*/
public interface NoopTopicSinkFactory {
-
- /**
- * Creates noop topic sinks based on properties files
- *
- * @param properties Properties containing initialization values
- *
- * @return a noop topic sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public List<NoopTopicSink> build(Properties properties);
-
- /**
- * builds a noop sink
- *
- * @param servers list of servers
- * @param topic topic name
- * @param managed is this sink endpoint managed?
- * @return a noop topic sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public NoopTopicSink build(List<String> servers, String topic, boolean managed);
-
- /**
- * Destroys a sink based on the topic
- *
- * @param topic topic name
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public void destroy(String topic);
-
- /**
- * gets a sink based on topic name
- * @param topic the topic name
- *
- * @return a sink with topic name
- * @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the sink is in an incorrect state
- */
- public NoopTopicSink get(String topic);
-
- /**
- * Provides a snapshot of the UEB Topic Writers
- * @return a list of the UEB Topic Writers
- */
- public List<NoopTopicSink> inventory();
-
- /**
- * Destroys all sinks
- */
- public void destroy();
+
+ /**
+ * Creates noop topic sinks based on properties files
+ *
+ * @param properties Properties containing initialization values
+ *
+ * @return a noop topic sink
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public List<NoopTopicSink> build(Properties properties);
+
+ /**
+ * builds a noop sink
+ *
+ * @param servers list of servers
+ * @param topic topic name
+ * @param managed is this sink endpoint managed?
+ * @return a noop topic sink
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public NoopTopicSink build(List<String> servers, String topic, boolean managed);
+
+ /**
+ * Destroys a sink based on the topic
+ *
+ * @param topic topic name
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public void destroy(String topic);
+
+ /**
+ * gets a sink based on topic name
+ *
+ * @param topic the topic name
+ *
+ * @return a sink with topic name
+ * @throws IllegalArgumentException if an invalid topic is provided
+ * @throws IllegalStateException if the sink is in an incorrect state
+ */
+ public NoopTopicSink get(String topic);
+
+ /**
+ * Provides a snapshot of the UEB Topic Writers
+ *
+ * @return a list of the UEB Topic Writers
+ */
+ public List<NoopTopicSink> inventory();
+
+ /**
+ * Destroys all sinks
+ */
+ public void destroy();
}
+
/* ------------- implementation ----------------- */
/**
* Factory of noop sinks
*/
class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory {
- /**
- * Logger
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
-
- /**
- * noop topic sinks map
- */
- protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>();
-
- @Override
- public List<NoopTopicSink> build(Properties properties) {
-
- String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS);
- if (sinkTopics == null || sinkTopics.isEmpty()) {
- logger.info("{}: no topic for noop sink", this);
- return new ArrayList<>();
- }
-
- List<String> sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*")));
- List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>();
- synchronized(this) {
- for (String topic: sinkTopicList) {
- if (this.noopTopicSinks.containsKey(topic)) {
- newSinks.add(this.noopTopicSinks.get(topic));
- continue;
- }
-
- String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." +
- topic +
- PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-
- if (servers == null || servers.isEmpty())
- servers = "noop";
-
- List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
-
- String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
- PolicyProperties.PROPERTY_MANAGED_SUFFIX);
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- NoopTopicSink noopSink = this.build(serverList, topic, managed);
- newSinks.add(noopSink);
- }
- return newSinks;
- }
- }
-
- @Override
- public NoopTopicSink build(List<String> servers, String topic, boolean managed) {
- if (servers == null) {
- servers = new ArrayList<>();
- }
-
- if (servers.isEmpty()) {
- servers.add("noop");
- }
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException("A topic must be provided");
- }
-
- synchronized (this) {
- if (noopTopicSinks.containsKey(topic)) {
- return noopTopicSinks.get(topic);
- }
-
- NoopTopicSink sink =
- new NoopTopicSink(servers, topic);
-
- if (managed)
- noopTopicSinks.put(topic, sink);
-
- return sink;
- }
- }
-
- @Override
- public void destroy(String topic) {
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException("A topic must be provided");
- }
-
- NoopTopicSink noopSink;
- synchronized(this) {
- if (!noopTopicSinks.containsKey(topic)) {
- return;
- }
-
- noopSink = noopTopicSinks.remove(topic);
- }
-
- noopSink.shutdown();
- }
-
- @Override
- public void destroy() {
- List<NoopTopicSink> sinks = this.inventory();
- for (NoopTopicSink sink: sinks) {
- sink.shutdown();
- }
-
- synchronized(this) {
- this.noopTopicSinks.clear();
- }
- }
-
- @Override
- public NoopTopicSink get(String topic) {
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException("A topic must be provided");
- }
-
- synchronized(this) {
- if (noopTopicSinks.containsKey(topic)) {
- return noopTopicSinks.get(topic);
- } else {
- throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
- }
- }
- }
-
- @Override
- public List<NoopTopicSink> inventory() {
- return new ArrayList<>(this.noopTopicSinks.values());
- }
+ /**
+ * Logger
+ */
+ private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
+
+ /**
+ * noop topic sinks map
+ */
+ protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>();
+
+ @Override
+ public List<NoopTopicSink> build(Properties properties) {
+
+ final String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS);
+ if (sinkTopics == null || sinkTopics.isEmpty()) {
+ logger.info("{}: no topic for noop sink", this);
+ return new ArrayList<>();
+ }
+
+ final List<String> sinkTopicList =
+ new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*")));
+ final List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>();
+ synchronized (this) {
+ for (final String topic : sinkTopicList) {
+ if (this.noopTopicSinks.containsKey(topic)) {
+ newSinks.add(this.noopTopicSinks.get(topic));
+ continue;
+ }
+
+ String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+
+ if (servers == null || servers.isEmpty())
+ servers = "noop";
+
+ final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
+
+ final String managedString =
+ properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_MANAGED_SUFFIX);
+ boolean managed = true;
+ if (managedString != null && !managedString.isEmpty()) {
+ managed = Boolean.parseBoolean(managedString);
+ }
+
+ final NoopTopicSink noopSink = this.build(serverList, topic, managed);
+ newSinks.add(noopSink);
+ }
+ return newSinks;
+ }
+ }
+
+ @Override
+ public NoopTopicSink build(List<String> servers, String topic, boolean managed) {
+ if (servers == null) {
+ servers = new ArrayList<>();
+ }
+
+ if (servers.isEmpty()) {
+ servers.add("noop");
+ }
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A topic must be provided");
+ }
+
+ synchronized (this) {
+ if (this.noopTopicSinks.containsKey(topic)) {
+ return this.noopTopicSinks.get(topic);
+ }
+
+ final NoopTopicSink sink = new NoopTopicSink(servers, topic);
+
+ if (managed)
+ this.noopTopicSinks.put(topic, sink);
+
+ return sink;
+ }
+ }
+
+ @Override
+ public void destroy(String topic) {
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A topic must be provided");
+ }
+
+ NoopTopicSink noopSink;
+ synchronized (this) {
+ if (!this.noopTopicSinks.containsKey(topic)) {
+ return;
+ }
+
+ noopSink = this.noopTopicSinks.remove(topic);
+ }
+
+ noopSink.shutdown();
+ }
+
+ @Override
+ public void destroy() {
+ final List<NoopTopicSink> sinks = this.inventory();
+ for (final NoopTopicSink sink : sinks) {
+ sink.shutdown();
+ }
+
+ synchronized (this) {
+ this.noopTopicSinks.clear();
+ }
+ }
+
+ @Override
+ public NoopTopicSink get(String topic) {
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A topic must be provided");
+ }
+
+ synchronized (this) {
+ if (this.noopTopicSinks.containsKey(topic)) {
+ return this.noopTopicSinks.get(topic);
+ } else {
+ throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
+ }
+ }
+ }
+
+ @Override
+ public List<NoopTopicSink> inventory() {
+ return new ArrayList<>(this.noopTopicSinks.values());
+ }
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
index edb03bba..8171c35d 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -34,427 +34,432 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
import com.att.nsa.cambria.client.CambriaConsumer;
import com.att.nsa.mr.client.MRClientFactory;
import com.att.nsa.mr.client.impl.MRConsumerImpl;
import com.att.nsa.mr.client.response.MRConsumerResponse;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
/**
* Wrapper around libraries to consume from message bus
*
*/
public interface BusConsumer {
-
- /**
- * fetch messages
- *
- * @return list of messages
- * @throws Exception when error encountered by underlying libraries
- */
- public Iterable<String> fetch() throws InterruptedException, IOException;
-
- /**
- * close underlying library consumer
- */
- public void close();
-
- /**
- * Cambria based consumer
- */
- public static class CambriaConsumerWrapper implements BusConsumer {
-
- /**
- * Cambria client
- */
- protected CambriaConsumer consumer;
-
- /**
- * Cambria Consumer Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @throws GeneralSecurityException
- * @throws MalformedURLException
- */
- public CambriaConsumerWrapper(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts)
- throws IllegalArgumentException {
-
- ConsumerBuilder builder =
- new CambriaClientBuilders.ConsumerBuilder();
-
-
- if (useHttps){
-
- if(useSelfSignedCerts){
- builder.knownAs(consumerGroup, consumerInstance)
- .usingHosts(servers)
- .onTopic(topic)
- .waitAtServer(fetchTimeout)
- .receivingAtMost(fetchLimit)
- .usingHttps()
- .allowSelfSignedCertificates();
- }
- else{
- builder.knownAs(consumerGroup, consumerInstance)
- .usingHosts(servers)
- .onTopic(topic)
- .waitAtServer(fetchTimeout)
- .receivingAtMost(fetchLimit)
- .usingHttps();
- }
- }
- else{
- builder.knownAs(consumerGroup, consumerInstance)
- .usingHosts(servers)
- .onTopic(topic)
- .waitAtServer(fetchTimeout)
- .receivingAtMost(fetchLimit);
- }
-
- if (apiKey != null && !apiKey.isEmpty() &&
- apiSecret != null && !apiSecret.isEmpty()) {
- builder.authenticatedBy(apiKey, apiSecret);
- }
-
- try {
- this.consumer = builder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public Iterable<String> fetch() throws IOException {
- return this.consumer.fetch();
- }
-
- @Override
- public void close() {
- this.consumer.close();
- }
-
- @Override
- public String toString() {
- return "CambriaConsumerWrapper []";
- }
- }
-
- /**
- * MR based consumer
- */
- public abstract class DmaapConsumerWrapper implements BusConsumer {
-
- /**
- * logger
- */
- private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
-
- /**
- * fetch timeout
- */
- protected int fetchTimeout;
-
- /**
- * close condition
- */
- protected Object closeCondition = new Object();
-
- /**
- * MR Consumer
- */
- protected MRConsumerImpl consumer;
-
- /**
- * MR Consumer Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param username AAF Login
- * @param password AAF Password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @throws MalformedURLException
- */
- public DmaapConsumerWrapper(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String username, String password,
- String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit, boolean useHttps)
- throws MalformedURLException {
-
- this.fetchTimeout = fetchTimeout;
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException("No topic for DMaaP");
- }
-
- this.consumer = new MRConsumerImpl(servers, topic,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit,
- null, apiKey, apiSecret);
-
- this.consumer.setUsername(username);
- this.consumer.setPassword(password);
- }
-
- @Override
- public Iterable<String> fetch() throws InterruptedException, IOException {
- MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
- if (response == null) {
- logger.warn("{}: DMaaP NULL response received", this);
-
- synchronized (closeCondition) {
- closeCondition.wait(fetchTimeout);
- }
- return new ArrayList<>();
- } else {
- logger.debug("DMaaP consumer received {} : {}" +
- response.getResponseCode(),
- response.getResponseMessage());
-
- if (response.getResponseCode() == null ||
- !response.getResponseCode().equals("200")) {
-
- logger.error("DMaaP consumer received: {} : {}",
- response.getResponseCode(),
- response.getResponseMessage());
-
- synchronized (closeCondition) {
- closeCondition.wait(fetchTimeout);
- }
-
- /* fall through */
- }
- }
-
- if (response.getActualMessages() == null)
- return new ArrayList<>();
- else
- return response.getActualMessages();
- }
-
- @Override
- public void close() {
- synchronized (closeCondition) {
- closeCondition.notifyAll();
- }
-
- this.consumer.close();
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.
- append("DmaapConsumerWrapper [").
- append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
- append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
- append(", consumer.getHost()=").append(consumer.getHost()).
- append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
- append(", consumer.getUsername()=").append(consumer.getUsername()).
- append("]");
- return builder.toString();
- }
- }
-
- /**
- * MR based consumer
- */
- public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
-
- private Properties props;
-
- /**
- * MR Consumer Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param aafLogin AAF Login
- * @param aafPassword AAF Password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @throws MalformedURLException
- */
- public DmaapAafConsumerWrapper(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String aafLogin, String aafPassword,
- String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException {
-
- super(servers, topic, apiKey, apiSecret,
- aafLogin, aafPassword,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, useHttps);
-
- // super constructor sets servers = {""} if empty to avoid errors when using DME2
- if ((servers.size() == 1 && servers.get(0).equals("")) ||
- (servers == null) || (servers.isEmpty())) {
- throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
- }
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
-
- props = new Properties();
-
- if(useHttps){
- props.setProperty("Protocol", "https");
- this.consumer.setHost(servers.get(0) + ":3905");
-
- }
- else{
- props.setProperty("Protocol", "http");
- this.consumer.setHost(servers.get(0) + ":3904");
- }
-
- this.consumer.setProps(props);
- logger.info("{}: CREATION", this);
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- MRConsumerImpl consumer = (MRConsumerImpl) this.consumer;
-
- builder.
- append("DmaapConsumerWrapper [").
- append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
- append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
- append(", consumer.getHost()=").append(consumer.getHost()).
- append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
- append(", consumer.getUsername()=").append(consumer.getUsername()).
- append("]");
- return builder.toString();
- }
- }
-
- public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
-
- private Properties props;
-
- public DmaapDmeConsumerWrapper(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String dme2Login, String dme2Password,
- String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit,
- String environment, String aftEnvironment, String dme2Partner,
- String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) throws MalformedURLException {
-
-
-
- super(servers, topic, apiKey, apiSecret,
- dme2Login, dme2Password,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, useHttps);
-
-
- String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
-
- if (environment == null || environment.isEmpty()) {
- throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
- } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
- throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
- } if (latitude == null || latitude.isEmpty()) {
- throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
- } if (longitude == null || longitude.isEmpty()) {
- throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
- }
-
- if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
- throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " +
- PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
- }
-
- String serviceName = servers.get(0);
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
-
- this.consumer.setUsername(dme2Login);
- this.consumer.setPassword(dme2Password);
-
- props = new Properties();
-
- props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
-
- props.setProperty("username", dme2Login);
- props.setProperty("password", dme2Password);
-
- /* These are required, no defaults */
- props.setProperty("topic", topic);
-
- props.setProperty("Environment", environment);
- props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
-
- if (dme2Partner != null)
- props.setProperty("Partner", dme2Partner);
- if (dme2RouteOffer != null)
- props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
-
- props.setProperty("Latitude", latitude);
- props.setProperty("Longitude", longitude);
-
- /* These are optional, will default to these values if not set in additionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
-
- /* These should not change */
- props.setProperty("TransportType", "DME2");
- props.setProperty("MethodType", "GET");
-
- if(useHttps){
- props.setProperty("Protocol", "https");
-
- }
- else{
- props.setProperty("Protocol", "http");
- }
-
- props.setProperty("contenttype", "application/json");
-
- if (additionalProps != null) {
- for(String key : additionalProps.keySet())
- props.put(key, additionalProps.get(key));
- }
-
- MRClientFactory.prop = props;
- this.consumer.setProps(props);
-
- logger.info("{}: CREATION", this);
- }
- }
-}
+ /**
+ * fetch messages
+ *
+ * @return list of messages
+ * @throws Exception when error encountered by underlying libraries
+ */
+ public Iterable<String> fetch() throws InterruptedException, IOException;
+
+ /**
+ * close underlying library consumer
+ */
+ public void close();
+
+ /**
+ * Cambria based consumer
+ */
+ public static class CambriaConsumerWrapper implements BusConsumer {
+
+ /**
+ * logger
+ */
+ private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
+
+ /**
+ * Cambria client
+ */
+ protected CambriaConsumer consumer;
+
+ /**
+ * fetch timeout
+ */
+ protected int fetchTimeout;
+
+ /**
+ * close condition
+ */
+ protected Object closeCondition = new Object();
+
+ /**
+ * Cambria Consumer Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ * @throws GeneralSecurityException
+ * @throws MalformedURLException
+ */
+ public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
+ String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout,
+ int fetchLimit, boolean useHttps, boolean useSelfSignedCerts)
+ throws IllegalArgumentException {
+
+ this.fetchTimeout = fetchTimeout;
+
+ final ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder();
+
+ if (useHttps) {
+
+ if (useSelfSignedCerts) {
+ builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
+ .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps()
+ .allowSelfSignedCertificates();
+ } else {
+ builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
+ .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps();
+ }
+ } else {
+ builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
+ .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
+ }
+
+ if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
+ builder.authenticatedBy(apiKey, apiSecret);
+ }
+
+ try {
+ this.consumer = builder.build();
+ } catch (MalformedURLException | GeneralSecurityException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public Iterable<String> fetch() throws IOException, InterruptedException {
+ try {
+ return this.consumer.fetch();
+ } catch (final IOException e) {
+ logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
+ this.fetchTimeout);
+ synchronized (this.closeCondition) {
+ this.closeCondition.wait(this.fetchTimeout);
+ }
+
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized (closeCondition) {
+ closeCondition.notifyAll();
+ }
+
+ this.consumer.close();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("CambriaConsumerWrapper [fetchTimeout=").append(fetchTimeout).append("]");
+ return builder.toString();
+ }
+ }
+
+ /**
+ * MR based consumer
+ */
+ public abstract class DmaapConsumerWrapper implements BusConsumer {
+
+ /**
+ * logger
+ */
+ private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
+
+ /**
+ * fetch timeout
+ */
+ protected int fetchTimeout;
+
+ /**
+ * close condition
+ */
+ protected Object closeCondition = new Object();
+
+ /**
+ * MR Consumer
+ */
+ protected MRConsumerImpl consumer;
+
+ /**
+ * MR Consumer Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param username AAF Login
+ * @param password AAF Password
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ * @throws MalformedURLException
+ */
+ public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
+ String username, String password, String consumerGroup, String consumerInstance,
+ int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException {
+
+ this.fetchTimeout = fetchTimeout;
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("No topic for DMaaP");
+ }
+
+ this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance,
+ fetchTimeout, fetchLimit, null, apiKey, apiSecret);
+
+ this.consumer.setUsername(username);
+ this.consumer.setPassword(password);
+ }
+
+ @Override
+ public Iterable<String> fetch() throws InterruptedException, IOException {
+ final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
+ if (response == null) {
+ logger.warn("{}: DMaaP NULL response received", this);
+
+ synchronized (closeCondition) {
+ closeCondition.wait(fetchTimeout);
+ }
+ return new ArrayList<>();
+ } else {
+ logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(),
+ response.getResponseMessage());
+
+ if (response.getResponseCode() == null || !response.getResponseCode().equals("200")) {
+
+ logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
+ response.getResponseMessage());
+
+ synchronized (closeCondition) {
+ closeCondition.wait(fetchTimeout);
+ }
+
+ /* fall through */
+ }
+ }
+
+ if (response.getActualMessages() == null)
+ return new ArrayList<>();
+ else
+ return response.getActualMessages();
+ }
+
+ @Override
+ public void close() {
+ synchronized (closeCondition) {
+ closeCondition.notifyAll();
+ }
+
+ this.consumer.close();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=")
+ .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=")
+ .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost())
+ .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag())
+ .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]");
+ return builder.toString();
+ }
+ }
+ /**
+ * MR based consumer
+ */
+ public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
+
+ private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
+
+ private final Properties props;
+
+ /**
+ * MR Consumer Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param aafLogin AAF Login
+ * @param aafPassword AAF Password
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ * @throws MalformedURLException
+ */
+ public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey,
+ String apiSecret, String aafLogin, String aafPassword, String consumerGroup,
+ String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps)
+ throws MalformedURLException {
+
+ super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup,
+ consumerInstance, fetchTimeout, fetchLimit, useHttps);
+
+ // super constructor sets servers = {""} if empty to avoid errors when using DME2
+ if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null)
+ || (servers.isEmpty())) {
+ throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
+ }
+
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+
+ props = new Properties();
+
+ if (useHttps) {
+ props.setProperty("Protocol", "https");
+ this.consumer.setHost(servers.get(0) + ":3905");
+
+ } else {
+ props.setProperty("Protocol", "http");
+ this.consumer.setHost(servers.get(0) + ":3904");
+ }
+
+ this.consumer.setProps(props);
+ logger.info("{}: CREATION", this);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder builder = new StringBuilder();
+ final MRConsumerImpl consumer = this.consumer;
+
+ builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=")
+ .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=")
+ .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost())
+ .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag())
+ .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]");
+ return builder.toString();
+ }
+ }
+
+ public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
+
+ private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
+
+ private final Properties props;
+
+ public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey,
+ String apiSecret, String dme2Login, String dme2Password, String consumerGroup,
+ String consumerInstance, int fetchTimeout, int fetchLimit, String environment,
+ String aftEnvironment, String dme2Partner, String latitude, String longitude,
+ Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException {
+
+
+
+ super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup,
+ consumerInstance, fetchTimeout, fetchLimit, useHttps);
+
+
+ final String dme2RouteOffer =
+ additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+
+ if (environment == null || environment.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX
+ + " property for DME2 in DMaaP");
+ }
+ if (aftEnvironment == null || aftEnvironment.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX
+ + " property for DME2 in DMaaP");
+ }
+ if (latitude == null || latitude.isEmpty()) {
+ throw new IllegalArgumentException("Missing "
+ + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
+ }
+ if (longitude == null || longitude.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX
+ + " property for DME2 in DMaaP");
+ }
+
+ if ((dme2Partner == null || dme2Partner.isEmpty())
+ && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
+ throw new IllegalArgumentException(
+ "Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
+ + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ }
+
+ final String serviceName = servers.get(0);
+
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
+
+ this.consumer.setUsername(dme2Login);
+ this.consumer.setPassword(dme2Password);
+
+ props = new Properties();
+
+ props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+
+ props.setProperty("username", dme2Login);
+ props.setProperty("password", dme2Password);
+
+ /* These are required, no defaults */
+ props.setProperty("topic", topic);
+
+ props.setProperty("Environment", environment);
+ props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+
+ if (dme2Partner != null)
+ props.setProperty("Partner", dme2Partner);
+ if (dme2RouteOffer != null)
+ props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+
+ props.setProperty("Latitude", latitude);
+ props.setProperty("Longitude", longitude);
+
+ /* These are optional, will default to these values if not set in additionalProps */
+ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
+ props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
+ props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
+ props.setProperty("Version", "1.0");
+ props.setProperty("SubContextPath", "/");
+ props.setProperty("sessionstickinessrequired", "no");
+
+ /* These should not change */
+ props.setProperty("TransportType", "DME2");
+ props.setProperty("MethodType", "GET");
+
+ if (useHttps) {
+ props.setProperty("Protocol", "https");
+
+ } else {
+ props.setProperty("Protocol", "http");
+ }
+
+ props.setProperty("contenttype", "application/json");
+
+ if (additionalProps != null) {
+ for (final String key : additionalProps.keySet())
+ props.put(key, additionalProps.get(key));
+ }
+
+ MRClientFactory.prop = props;
+ this.consumer.setProps(props);
+
+ logger.info("{}: CREATION", this);
+ }
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java
index f7ef7bcf..dd9a7c2b 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ package org.onap.policy.drools.http.server.test;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
@@ -33,186 +34,170 @@ import org.junit.Test;
import org.onap.policy.drools.http.client.HttpClient;
import org.onap.policy.drools.http.server.HttpServletServer;
import org.onap.policy.drools.properties.PolicyProperties;
+import org.onap.policy.drools.utils.NetworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpClientTest {
-
- private static Logger logger = LoggerFactory.getLogger(HttpClientTest.class);
-
- @BeforeClass
- public static void setUp() throws InterruptedException {
- logger.info("-- setup() --");
-
- /* echo server */
-
- HttpServletServer echoServerNoAuth = HttpServletServer.factory.build("echo", "localhost", 6666, "/", false, true);
- echoServerNoAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName());
- echoServerNoAuth.waitedStart(5000);
-
- /* no auth echo server */
-
- HttpServletServer echoServerAuth = HttpServletServer.factory.build("echo", "localhost", 6667, "/", false, true);
- echoServerAuth.setBasicAuthentication("x", "y", null);
- echoServerAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName());
- echoServerAuth.waitedStart(5000);
- }
-
- @AfterClass
- public static void tearDown() {
- logger.info("-- tearDown() --");
-
- HttpServletServer.factory.destroy();
- HttpClient.factory.destroy();
- }
-
- @Test
- public void testHttpNoAuthClient() throws Exception {
- logger.info("-- testHttpNoAuthClient() --");
-
- HttpClient client = HttpClient.factory.build("testHttpNoAuthClient", false, false,
- "localhost", 6666, "junit/echo",
- null, null, true);
- Response response = client.get("hello");
- String body = HttpClient.getBody(response, String.class);
-
- assertTrue(response.getStatus() == 200);
- assertTrue(body.equals("hello"));
- }
-
- @Test
- public void testHttpAuthClient() throws Exception {
- logger.info("-- testHttpAuthClient() --");
-
- HttpClient client = HttpClient.factory.build("testHttpAuthClient",false, false,
- "localhost", 6667, "junit/echo",
- "x", "y", true);
- Response response = client.get("hello");
- String body = HttpClient.getBody(response, String.class);
-
- assertTrue(response.getStatus() == 200);
- assertTrue(body.equals("hello"));
- }
-
- @Test
- public void testHttpAuthClient401() throws Exception {
- logger.info("-- testHttpAuthClient401() --");
-
- HttpClient client = HttpClient.factory.build("testHttpAuthClient401",false, false,
- "localhost", 6667, "junit/echo",
- null, null, true);
- Response response = client.get("hello");
- assertTrue(response.getStatus() == 401);
- }
-
- @Test
- public void testHttpAuthClientProps() throws Exception {
- logger.info("-- testHttpAuthClientProps() --");
-
- Properties httpProperties = new Properties();
-
- httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES, "PAP,PDP");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX,
- "localhost");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX,
- "7777");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX,
- "testpap");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX,
- "alpha123");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
- RestMockHealthCheck.class.getName());
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX,
- "true");
-
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX,
- "localhost");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX,
- "7778");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX,
- "testpdp");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX,
- "alpha123");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
- RestMockHealthCheck.class.getName());
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX,
- "true");
-
- httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES, "PAP,PDP");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX,
- "localhost");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX,
- "7777");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX,
- "pap/test");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX,
- "false");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX,
- "testpap");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX,
- "alpha123");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX,
- "true");
-
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX,
- "localhost");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX,
- "7778");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX,
- "pdp");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX,
- "false");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX,
- "testpdp");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX,
- "alpha123");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX,
- "true");
-
- ArrayList<HttpServletServer> servers = HttpServletServer.factory.build(httpProperties);
- assertTrue(servers.size() == 2);
-
- ArrayList<HttpClient> clients = HttpClient.factory.build(httpProperties);
- assertTrue(clients.size() == 2);
-
- for (HttpServletServer server: servers) {
- server.waitedStart(5000);
- }
-
- HttpClient clientPAP = HttpClient.factory.get("PAP");
- Response response = clientPAP.get();
- assertTrue(response.getStatus() == 200);
-
- HttpClient clientPDP = HttpClient.factory.get("PDP");
- Response response2 = clientPDP.get("test");
- assertTrue(response2.getStatus() == 500);
+
+ private static Logger logger = LoggerFactory.getLogger(HttpClientTest.class);
+
+ @BeforeClass
+ public static void setUp() throws InterruptedException, IOException {
+ logger.info("-- setup() --");
+
+ /* echo server */
+
+ final HttpServletServer echoServerNoAuth =
+ HttpServletServer.factory.build("echo", "localhost", 6666, "/", false, true);
+ echoServerNoAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName());
+ echoServerNoAuth.waitedStart(5000);
+
+ if (!NetworkUtil.isTcpPortOpen("localhost", echoServerNoAuth.getPort(), 5, 10000L))
+ throw new IllegalStateException("cannot connect to port " + echoServerNoAuth.getPort());
+
+ /* no auth echo server */
+
+ final HttpServletServer echoServerAuth =
+ HttpServletServer.factory.build("echo", "localhost", 6667, "/", false, true);
+ echoServerAuth.setBasicAuthentication("x", "y", null);
+ echoServerAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName());
+ echoServerAuth.waitedStart(5000);
+
+ if (!NetworkUtil.isTcpPortOpen("localhost", echoServerAuth.getPort(), 5, 10000L))
+ throw new IllegalStateException("cannot connect to port " + echoServerAuth.getPort());
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ logger.info("-- tearDown() --");
+
+ HttpServletServer.factory.destroy();
+ HttpClient.factory.destroy();
+ }
+
+ @Test
+ public void testHttpNoAuthClient() throws Exception {
+ logger.info("-- testHttpNoAuthClient() --");
+
+ final HttpClient client = HttpClient.factory.build("testHttpNoAuthClient", false, false,
+ "localhost", 6666, "junit/echo", null, null, true);
+ final Response response = client.get("hello");
+ final String body = HttpClient.getBody(response, String.class);
+
+ assertTrue(response.getStatus() == 200);
+ assertTrue(body.equals("hello"));
+ }
+
+ @Test
+ public void testHttpAuthClient() throws Exception {
+ logger.info("-- testHttpAuthClient() --");
+
+ final HttpClient client = HttpClient.factory.build("testHttpAuthClient", false, false,
+ "localhost", 6667, "junit/echo", "x", "y", true);
+ final Response response = client.get("hello");
+ final String body = HttpClient.getBody(response, String.class);
+
+ assertTrue(response.getStatus() == 200);
+ assertTrue(body.equals("hello"));
+ }
+
+ @Test
+ public void testHttpAuthClient401() throws Exception {
+ logger.info("-- testHttpAuthClient401() --");
+
+ final HttpClient client = HttpClient.factory.build("testHttpAuthClient401", false, false,
+ "localhost", 6667, "junit/echo", null, null, true);
+ final Response response = client.get("hello");
+ assertTrue(response.getStatus() == 401);
+ }
+
+ @Test
+ public void testHttpAuthClientProps() throws Exception {
+ logger.info("-- testHttpAuthClientProps() --");
+
+ final Properties httpProperties = new Properties();
+
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES, "PAP,PDP");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7777");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpap");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123");
+ httpProperties.setProperty(
+ PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
+ RestMockHealthCheck.class.getName());
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true");
+
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7778");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpdp");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123");
+ httpProperties.setProperty(
+ PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
+ RestMockHealthCheck.class.getName());
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true");
+
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES, "PAP,PDP");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7777");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, "pap/test");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, "false");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpap");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true");
+
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7778");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, "pdp");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, "false");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpdp");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true");
+
+ final ArrayList<HttpServletServer> servers = HttpServletServer.factory.build(httpProperties);
+ assertTrue(servers.size() == 2);
+
+ final ArrayList<HttpClient> clients = HttpClient.factory.build(httpProperties);
+ assertTrue(clients.size() == 2);
+
+ for (final HttpServletServer server : servers) {
+ server.waitedStart(10000);
}
+ final HttpClient clientPAP = HttpClient.factory.get("PAP");
+ final Response response = clientPAP.get();
+ assertTrue(response.getStatus() == 200);
+
+ final HttpClient clientPDP = HttpClient.factory.get("PDP");
+ final Response response2 = clientPDP.get("test");
+ assertTrue(response2.getStatus() == 500);
+ }
+
}