summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java180
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java215
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java12
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java76
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java96
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java47
6 files changed, 257 insertions, 369 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
index 206018a9..e79d4888 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
@@ -38,57 +38,36 @@ import org.slf4j.LoggerFactory;
* DMAAP Topic Sink Factory.
*/
public interface DmaapTopicSinkFactory {
- public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
- public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
- public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
- public final String DME2_VERSION_PROPERTY = "Version";
- public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
- public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
- public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
- public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
+ String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
+ String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
+ String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
+ String DME2_VERSION_PROPERTY = "Version";
+ String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
+ String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
+ String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
+ String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
/**
- * Instantiates a new DMAAP Topic Sink.
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName AAF user name
- * @param password AAF password
- * @param partitionKey Consumer Group
- * @param environment DME2 environment
- * @param aftEnvironment DME2 AFT environment
- * @param partner DME2 Partner
- * @param latitude DME2 latitude
- * @param longitude DME2 longitude
- * @param additionalProps additional properties to pass to DME2
- * @param managed is this sink endpoint managed?
- * @return an DMAAP Topic Sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, String environment, String aftEnvironment,
- String partner, String latitude, String longitude, Map<String, String> additionalProps,
- boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
-
- /**
- * Instantiates a new DMAAP Topic Sink.
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName AAF user name
- * @param password AAF password
- * @param partitionKey Consumer Group
- * @param managed is this sink endpoint managed?
- * @return an DMAAP Topic Sink
+ * Instantiate a new DMAAP Topic Sink, with following params.
+ * servers list of servers
+ * topic topic name
+ * apiKey API Key
+ * apiSecret API Secret
+ * userName AAF user name
+ * password AAF password
+ * partitionKey Consumer Group
+ * environment DME2 environment
+ * aftEnvironment DME2 AFT environment
+ * partner DME2 Partner
+ * latitude DME2 latitude
+ * longitude DME2 longitude
+ * additionalProps additional properties to pass to DME2
+ * managed is this sink endpoint managed?
+ * @param busTopicParams parameter object
+ * @return DmaapTopicSink object
* @throws IllegalArgumentException if invalid parameters are present
*/
- public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, boolean managed,
- boolean useHttps, boolean allowSelfSignedCerts);
+ DmaapTopicSink build(BusTopicParams busTopicParams);
/**
* Creates an DMAAP Topic Sink based on properties files.
@@ -97,7 +76,7 @@ public interface DmaapTopicSinkFactory {
* @return an DMAAP Topic Sink
* @throws IllegalArgumentException if invalid parameters are present
*/
- public List<DmaapTopicSink> build(Properties properties);
+ List<DmaapTopicSink> build(Properties properties);
/**
* Instantiates a new DMAAP Topic Sink.
@@ -107,7 +86,7 @@ public interface DmaapTopicSinkFactory {
* @return an DMAAP Topic Sink
* @throws IllegalArgumentException if invalid parameters are present
*/
- public DmaapTopicSink build(List<String> servers, String topic);
+ DmaapTopicSink build(List<String> servers, String topic);
/**
* Destroys an DMAAP Topic Sink based on a topic.
@@ -115,12 +94,12 @@ public interface DmaapTopicSinkFactory {
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
- public void destroy(String topic);
+ void destroy(String topic);
/**
* Destroys all DMAAP Topic Sinks.
*/
- public void destroy();
+ void destroy();
/**
* Gets an DMAAP Topic Sink based on topic name.
@@ -130,14 +109,14 @@ public interface DmaapTopicSinkFactory {
* @throws IllegalArgumentException if an invalid topic is provided
* @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state
*/
- public DmaapTopicSink get(String topic);
+ DmaapTopicSink get(String topic);
/**
* Provides a snapshot of the DMAAP Topic Sinks.
*
* @return a list of the DMAAP Topic Sinks
*/
- public List<DmaapTopicSink> inventory();
+ List<DmaapTopicSink> inventory();
}
@@ -160,73 +139,21 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
@Override
- public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, String environment, String aftEnvironment,
- String partner, String latitude, String longitude, Map<String, String> additionalProps,
- boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
+ public DmaapTopicSink build(BusTopicParams busTopicParams){
- if (topic == null || topic.isEmpty()) {
+ if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
throw new IllegalArgumentException(MISSING_TOPIC);
}
synchronized (this) {
- if (dmaapTopicWriters.containsKey(topic)) {
- return dmaapTopicWriters.get(topic);
- }
-
- DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .userName(userName)
- .password(password)
- .partitionId(partitionKey)
- .environment(environment)
- .aftEnvironment(aftEnvironment)
- .partner(partner)
- .latitude(latitude)
- .longitude(longitude)
- .additionalProps(additionalProps)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts)
- .build());
-
- if (managed) {
- dmaapTopicWriters.put(topic, dmaapTopicSink);
+ if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
+ return dmaapTopicWriters.get(busTopicParams.getTopic());
}
- return dmaapTopicSink;
- }
- }
-
- @Override
- public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, boolean managed, boolean useHttps,
- boolean allowSelfSignedCerts) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
- synchronized (this) {
- if (dmaapTopicWriters.containsKey(topic)) {
- return dmaapTopicWriters.get(topic);
- }
+ DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(busTopicParams);
- DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .userName(userName)
- .password(password)
- .partitionId(partitionKey)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts)
- .build());
-
- if (managed) {
- dmaapTopicWriters.put(topic, dmaapTopicSink);
+ if (busTopicParams.isManaged()) {
+ dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
}
return dmaapTopicSink;
}
@@ -234,7 +161,13 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
@Override
public DmaapTopicSink build(List<String> servers, String topic) {
- return this.build(servers, topic, null, null, null, null, null, true, false, false);
+ return this.build(BusTopicParams.builder()
+ .servers(servers)
+ .topic(topic)
+ .managed(true)
+ .useHttps(false)
+ .allowSelfSignedCerts(false)
+ .build());
}
@Override
@@ -374,9 +307,24 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
}
- DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword,
- partitionKey, dme2Environment, dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude,
- dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
+ DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder()
+ .servers(serverList)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .userName(aafMechId)
+ .password(aafPassword)
+ .partitionId(partitionKey)
+ .environment(dme2Environment)
+ .aftEnvironment(dme2AftEnvironment)
+ .partner(dme2Partner)
+ .latitude(dme2Latitude)
+ .longitude(dme2Longitude)
+ .additionalProps(dme2AdditionalProps)
+ .managed(managed)
+ .useHttps(useHttps)
+ .allowSelfSignedCerts(allowSelfSignedCerts)
+ .build());
newDmaapTopicSinks.add(dmaapTopicSink);
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
index cc31c2a5..41611f4e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
@@ -38,14 +38,14 @@ import org.slf4j.LoggerFactory;
* DMAAP Topic Source Factory.
*/
public interface DmaapTopicSourceFactory {
- public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
- public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
- public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
- public final String DME2_VERSION_PROPERTY = "Version";
- public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
- public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
- public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
- public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
+ String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
+ String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
+ String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
+ String DME2_VERSION_PROPERTY = "Version";
+ String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
+ String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
+ String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
+ String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
/**
* Creates an DMAAP Topic Source based on properties files.
@@ -55,64 +55,29 @@ public interface DmaapTopicSourceFactory {
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public List<DmaapTopicSource> build(Properties properties);
+ List<DmaapTopicSource> build(Properties properties);
/**
* Instantiates a new DMAAP Topic Source.
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName user name
- * @param password password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Read Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @param managed is this endpoind managed?
- * @param useHttps does the connection use HTTPS?
- * @param allowSelfSignedCerts does connection allow self-signed certificates?
- *
- * @return an DMAAP Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit,
- boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
-
- /**
- * Instantiates a new DMAAP Topic Source.
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName user name
- * @param password password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Read Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @param environment DME2 environment
- * @param aftEnvironment DME2 AFT environment
- * @param partner DME2 Partner
- * @param latitude DME2 latitude
- * @param longitude DME2 longitude
- * @param additionalProps additional properties to pass to DME2
- * @param managed is this endpoind managed?
- * @param useHttps does the connection use HTTPS?
- * @param allowSelfSignedCerts does connection allow self-signed certificates?
*
+ * servers list of servers
+ * topic topic name
+ * apiKey API Key
+ * apiSecret API Secret
+ * userName user name
+ * password password
+ * consumerGroup Consumer Group
+ * consumerInstance Consumer Instance
+ * fetchTimeout Read Fetch Timeout
+ * fetchLimit Fetch Limit
+ * managed is this endpoind managed?
+ * useHttps does the connection use HTTPS?
+ * allowSelfSignedCerts does connection allow self-signed certificates?
+ * @param busTopicParams parameter object
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit, String environment, String aftEnvironment, String partner,
- String latitude, String longitude, Map<String, String> additionalProps,
- boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
+ DmaapTopicSource build(BusTopicParams busTopicParams);
/**
* Instantiates a new DMAAP Topic Source.
@@ -125,7 +90,7 @@ public interface DmaapTopicSourceFactory {
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
+ DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
/**
* Instantiates a new DMAAP Topic Source.
@@ -136,7 +101,7 @@ public interface DmaapTopicSourceFactory {
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public DmaapTopicSource build(List<String> servers, String topic);
+ DmaapTopicSource build(List<String> servers, String topic);
/**
* Destroys an DMAAP Topic Source based on a topic.
@@ -144,12 +109,12 @@ public interface DmaapTopicSourceFactory {
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
- public void destroy(String topic);
+ void destroy(String topic);
/**
* Destroys all DMAAP Topic Sources.
*/
- public void destroy();
+ void destroy();
/**
* Gets an DMAAP Topic Source based on topic name.
@@ -159,14 +124,14 @@ public interface DmaapTopicSourceFactory {
* @throws IllegalArgumentException if an invalid topic is provided
* @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
*/
- public DmaapTopicSource get(String topic);
+ DmaapTopicSource get(String topic);
/**
* Provides a snapshot of the DMAAP Topic Sources.
*
* @return a list of the DMAAP Topic Sources
*/
- public List<DmaapTopicSource> inventory();
+ List<DmaapTopicSource> inventory();
}
@@ -189,94 +154,28 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
*/
protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
- /**
- * {@inheritDoc}
- */
- @Override
- public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
- String environment, String aftEnvironment, String partner, String latitude, String longitude,
- Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (dmaapTopicSources.containsKey(topic)) {
- return dmaapTopicSources.get(topic);
- }
-
- DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .userName(userName)
- .password(password)
- .consumerGroup(consumerGroup)
- .consumerInstance(consumerInstance)
- .fetchTimeout(fetchTimeout)
- .fetchLimit(fetchLimit)
- .environment(environment)
- .aftEnvironment(aftEnvironment)
- .partner(partner)
- .latitude(latitude)
- .longitude(longitude)
- .additionalProps(additionalProps)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts)
- .build());
-
- if (managed) {
- dmaapTopicSources.put(topic, dmaapTopicSource);
- }
-
- return dmaapTopicSource;
- }
- }
/**
* {@inheritDoc}
*/
@Override
- public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
- boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
-
- if (servers == null || servers.isEmpty()) {
- throw new IllegalArgumentException("DMaaP Server(s) must be provided");
- }
+ public DmaapTopicSource build(BusTopicParams busTopicParams) {
- if (topic == null || topic.isEmpty()) {
+ if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
throw new IllegalArgumentException(MISSING_TOPIC);
}
synchronized (this) {
- if (dmaapTopicSources.containsKey(topic)) {
- return dmaapTopicSources.get(topic);
+ if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
+ return dmaapTopicSources.get(busTopicParams.getTopic());
}
DmaapTopicSource dmaapTopicSource =
- new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .userName(userName)
- .password(password)
- .consumerGroup(consumerGroup)
- .consumerInstance(consumerInstance)
- .fetchTimeout(fetchTimeout)
- .fetchLimit(fetchLimit)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts)
- .build());
-
- if (managed) {
- dmaapTopicSources.put(topic, dmaapTopicSource);
- }
+ new SingleThreadedDmaapTopicSource(busTopicParams);
+ if (busTopicParams.isManaged()) {
+ dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
+ }
return dmaapTopicSource;
}
}
@@ -454,10 +353,27 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
}
- DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId,
- aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment,
- dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed,
- useHttps, allowSelfSignedCerts);
+ DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder()
+ .servers(serverList)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .userName(aafMechId)
+ .password(aafPassword)
+ .consumerGroup(consumerGroup)
+ .consumerInstance(consumerInstance)
+ .fetchTimeout(fetchTimeout)
+ .fetchLimit(fetchLimit)
+ .environment(dme2Environment)
+ .aftEnvironment(dme2AftEnvironment)
+ .partner(dme2Partner)
+ .latitude(dme2Latitude)
+ .longitude(dme2Longitude)
+ .additionalProps(dme2AdditionalProps)
+ .managed(managed)
+ .useHttps(useHttps)
+ .allowSelfSignedCerts(allowSelfSignedCerts)
+ .build());
dmaapTopicSourceLst.add(uebTopicSource);
}
@@ -472,8 +388,17 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
*/
@Override
public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
- return this.build(servers, topic, apiKey, apiSecret, null, null, null, null,
- DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false);
+ return this.build(BusTopicParams.builder()
+ .servers(servers)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
+ .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH)
+ .managed(true)
+ .useHttps(false)
+ .allowSelfSignedCerts(false)
+ .build());
}
/**
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java
index adf79706..d418bfac 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java
@@ -43,7 +43,7 @@ public interface NoopTopicSinkFactory {
* @return a noop topic sink
* @throws IllegalArgumentException if invalid parameters are present
*/
- public List<NoopTopicSink> build(Properties properties);
+ List<NoopTopicSink> build(Properties properties);
/**
* builds a noop sink.
@@ -54,7 +54,7 @@ public interface NoopTopicSinkFactory {
* @return a noop topic sink
* @throws IllegalArgumentException if invalid parameters are present
*/
- public NoopTopicSink build(List<String> servers, String topic, boolean managed);
+ NoopTopicSink build(List<String> servers, String topic, boolean managed);
/**
* Destroys a sink based on the topic.
@@ -62,12 +62,12 @@ public interface NoopTopicSinkFactory {
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
- public void destroy(String topic);
+ void destroy(String topic);
/**
* Destroys all sinks.
*/
- public void destroy();
+ void destroy();
/**
* gets a sink based on topic name.
@@ -78,14 +78,14 @@ public interface NoopTopicSinkFactory {
* @throws IllegalArgumentException if an invalid topic is provided
* @throws IllegalStateException if the sink is in an incorrect state
*/
- public NoopTopicSink get(String topic);
+ NoopTopicSink get(String topic);
/**
* Provides a snapshot of the UEB Topic Writers.
*
* @return a list of the UEB Topic Writers
*/
- public List<NoopTopicSink> inventory();
+ List<NoopTopicSink> inventory();
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java
index 2d2e1369..faa8e342 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java
@@ -41,18 +41,17 @@ public interface UebTopicSinkFactory {
/**
* Instantiates a new UEB Topic Writer.
*
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param partitionKey Consumer Group
- * @param managed is this sink endpoint managed?
- *
+ * servers list of servers
+ * topic topic name
+ * apiKey API Key
+ * apiSecret API Secret
+ * partitionKey Consumer Group
+ * managed is this sink endpoint managed?
+ * @param busTopicParams parameter object
* @return an UEB Topic Sink
* @throws IllegalArgumentException if invalid parameters are present
*/
- public UebTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String partitionKey,
- boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
+ UebTopicSink build(BusTopicParams busTopicParams);
/**
* Creates an UEB Topic Writer based on properties files.
@@ -62,7 +61,7 @@ public interface UebTopicSinkFactory {
* @return an UEB Topic Writer
* @throws IllegalArgumentException if invalid parameters are present
*/
- public List<UebTopicSink> build(Properties properties);
+ List<UebTopicSink> build(Properties properties);
/**
* Instantiates a new UEB Topic Writer.
@@ -73,7 +72,7 @@ public interface UebTopicSinkFactory {
* @return an UEB Topic Writer
* @throws IllegalArgumentException if invalid parameters are present
*/
- public UebTopicSink build(List<String> servers, String topic);
+ UebTopicSink build(List<String> servers, String topic);
/**
* Destroys an UEB Topic Writer based on a topic.
@@ -81,12 +80,12 @@ public interface UebTopicSinkFactory {
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
- public void destroy(String topic);
+ void destroy(String topic);
/**
* Destroys all UEB Topic Writers.
*/
- public void destroy();
+ void destroy();
/**
* gets an UEB Topic Writer based on topic name.
@@ -97,14 +96,14 @@ public interface UebTopicSinkFactory {
* @throws IllegalArgumentException if an invalid topic is provided
* @throws IllegalStateException if the UEB Topic Reader is an incorrect state
*/
- public UebTopicSink get(String topic);
+ UebTopicSink get(String topic);
/**
* Provides a snapshot of the UEB Topic Writers.
*
* @return a list of the UEB Topic Writers
*/
- public List<UebTopicSink> inventory();
+ List<UebTopicSink> inventory();
}
@@ -127,34 +126,25 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
@Override
- public UebTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String partitionKey,
- boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
+ public UebTopicSink build(BusTopicParams busTopicParams) {
- if (servers == null || servers.isEmpty()) {
+ if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
throw new IllegalArgumentException("UEB Server(s) must be provided");
}
- if (topic == null || topic.isEmpty()) {
+ if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
throw new IllegalArgumentException(MISSING_TOPIC);
}
synchronized (this) {
- if (uebTopicSinks.containsKey(topic)) {
- return uebTopicSinks.get(topic);
+ if (uebTopicSinks.containsKey(busTopicParams.getTopic())) {
+ return uebTopicSinks.get(busTopicParams.getTopic());
}
- UebTopicSink uebTopicWriter = new InlineUebTopicSink(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .partitionId(partitionKey)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts)
- .build());
-
- if (managed) {
- uebTopicSinks.put(topic, uebTopicWriter);
+ UebTopicSink uebTopicWriter = new InlineUebTopicSink(busTopicParams);
+
+ if (busTopicParams.isManaged()) {
+ uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter);
}
return uebTopicWriter;
@@ -164,7 +154,13 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
@Override
public UebTopicSink build(List<String> servers, String topic) {
- return this.build(servers, topic, null, null, null, true, false, false);
+ return this.build(BusTopicParams.builder()
+ .servers(servers)
+ .topic(topic)
+ .managed(true)
+ .useHttps(false)
+ .allowSelfSignedCerts(false)
+ .build());
}
@@ -229,8 +225,16 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
}
- UebTopicSink uebTopicWriter = this.build(serverList, topic, apiKey, apiSecret, partitionKey, managed,
- useHttps, allowSelfSignedCerts);
+ UebTopicSink uebTopicWriter = this.build(BusTopicParams.builder()
+ .servers(serverList)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .partitionId(partitionKey)
+ .managed(managed)
+ .useHttps(useHttps)
+ .allowSelfSignedCerts(allowSelfSignedCerts)
+ .build());
newUebTopicSinks.add(uebTopicWriter);
}
return newUebTopicSinks;
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
index c4a69831..1245127a 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
@@ -46,28 +46,25 @@ public interface UebTopicSourceFactory {
* @return an UEB Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public List<UebTopicSource> build(Properties properties);
+ List<UebTopicSource> build(Properties properties);
/**
* Instantiates a new UEB Topic Source.
*
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Read Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @param managed is this source endpoint managed?
- *
+ * servers list of servers
+ * topic topic name
+ * apiKey API Key
+ * apiSecret API Secret
+ * consumerGroup Consumer Group
+ * consumerInstance Consumer Instance
+ * fetchTimeout Read Fetch Timeout
+ * fetchLimit Fetch Limit
+ * managed is this source endpoint managed?
+ * @param busTopicParams parameters object
* @return an UEB Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret,
- String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit, boolean managed,
- boolean useHttps, boolean allowSelfSignedCerts);
+ UebTopicSource build(BusTopicParams busTopicParams);
/**
* Instantiates a new UEB Topic Source.
@@ -80,7 +77,7 @@ public interface UebTopicSourceFactory {
* @return an UEB Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
+ UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
/**
* Instantiates a new UEB Topic Source.
@@ -91,7 +88,7 @@ public interface UebTopicSourceFactory {
* @return an UEB Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public UebTopicSource build(List<String> servers, String topic);
+ UebTopicSource build(List<String> servers, String topic);
/**
* Destroys an UEB Topic Source based on a topic.
@@ -99,12 +96,12 @@ public interface UebTopicSourceFactory {
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
- public void destroy(String topic);
+ void destroy(String topic);
/**
* Destroys all UEB Topic Sources.
*/
- public void destroy();
+ void destroy();
/**
* Gets an UEB Topic Source based on topic name.
@@ -114,14 +111,14 @@ public interface UebTopicSourceFactory {
* @throws IllegalArgumentException if an invalid topic is provided
* @throws IllegalStateException if the UEB Topic Source is an incorrect state
*/
- public UebTopicSource get(String topic);
+ UebTopicSource get(String topic);
/**
* Provides a snapshot of the UEB Topic Sources.
*
* @return a list of the UEB Topic Sources
*/
- public List<UebTopicSource> inventory();
+ List<UebTopicSource> inventory();
}
@@ -147,37 +144,24 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
* {@inheritDoc}
*/
@Override
- public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret,
- String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed,
- boolean useHttps, boolean allowSelfSignedCerts) {
- if (servers == null || servers.isEmpty()) {
+ public UebTopicSource build(BusTopicParams busTopicParams) {
+ if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
throw new IllegalArgumentException("UEB Server(s) must be provided");
}
- if (topic == null || topic.isEmpty()) {
+ if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
throw new IllegalArgumentException(MISSING_TOPIC);
}
synchronized (this) {
- if (uebTopicSources.containsKey(topic)) {
- return uebTopicSources.get(topic);
+ if (uebTopicSources.containsKey(busTopicParams.getTopic())) {
+ return uebTopicSources.get(busTopicParams.getTopic());
}
- UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .consumerGroup(consumerGroup)
- .consumerInstance(consumerInstance)
- .fetchTimeout(fetchTimeout)
- .fetchLimit(fetchLimit)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts)
- .build());
-
- if (managed) {
- uebTopicSources.put(topic, uebTopicSource);
+ UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(busTopicParams);
+
+ if (busTopicParams.isManaged()) {
+ uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource);
}
return uebTopicSource;
@@ -277,8 +261,18 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
}
- UebTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, consumerGroup,
- consumerInstance, fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts);
+ UebTopicSource uebTopicSource = this.build(BusTopicParams.builder()
+ .servers(serverList)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .consumerGroup(consumerGroup)
+ .consumerInstance(consumerInstance)
+ .fetchTimeout(fetchTimeout)
+ .fetchLimit(fetchLimit)
+ .managed(managed)
+ .useHttps(useHttps)
+ .allowSelfSignedCerts(allowSelfSignedCerts).build());
newUebTopicSources.add(uebTopicSource);
}
}
@@ -291,8 +285,16 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
@Override
public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
- return this.build(servers, topic, apiKey, apiSecret, null, null, UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
- UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true);
+ return this.build(BusTopicParams.builder()
+ .servers(servers)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .fetchTimeout(UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
+ .fetchLimit(UebTopicSource.DEFAULT_LIMIT_FETCH)
+ .managed(true)
+ .useHttps(false)
+ .allowSelfSignedCerts(true).build());
}
/**
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
index f9eca081..347ae42c 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
@@ -69,80 +69,81 @@ public class BusTopicParams {
private String longitude;
private Map<String, String> additionalProps;
private String partitionId;
+ private boolean managed;
- String getPartitionId() {
+ public String getPartitionId() {
return partitionId;
}
- String getUserName() {
+ public String getUserName() {
return userName;
}
- String getPassword() {
+ public String getPassword() {
return password;
}
- String getEnvironment() {
+ public String getEnvironment() {
return environment;
}
- String getAftEnvironment() {
+ public String getAftEnvironment() {
return aftEnvironment;
}
- String getPartner() {
+ public String getPartner() {
return partner;
}
- String getLatitude() {
+ public String getLatitude() {
return latitude;
}
- String getLongitude() {
+ public String getLongitude() {
return longitude;
}
- Map<String, String> getAdditionalProps() {
+ public Map<String, String> getAdditionalProps() {
return additionalProps;
}
- List<String> getServers() {
+ public List<String> getServers() {
return servers;
}
- String getTopic() {
+ public String getTopic() {
return topic;
}
- String getApiKey() {
+ public String getApiKey() {
return apiKey;
}
- String getApiSecret() {
+ public String getApiSecret() {
return apiSecret;
}
- String getConsumerGroup() {
+ public String getConsumerGroup() {
return consumerGroup;
}
- String getConsumerInstance() {
+ public String getConsumerInstance() {
return consumerInstance;
}
- int getFetchTimeout() {
+ public int getFetchTimeout() {
return fetchTimeout;
}
- int getFetchLimit() {
+ public int getFetchLimit() {
return fetchLimit;
}
- boolean isUseHttps() {
+ public boolean isUseHttps() {
return useHttps;
}
- boolean isAllowSelfSignedCerts() {
+ public boolean isAllowSelfSignedCerts() {
return allowSelfSignedCerts;
}
@@ -207,6 +208,10 @@ public class BusTopicParams {
return (partitionId == null || partitionId.trim().isEmpty());
}
+ public boolean isManaged() {
+ return managed;
+ }
+
public static class TopicParamsBuilder {
BusTopicParams m = new BusTopicParams();
@@ -312,6 +317,10 @@ public class BusTopicParams {
return m;
}
+ public TopicParamsBuilder managed(boolean managed) {
+ this.m.managed = managed;
+ return this;
+ }
}
}