aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java180
1 files changed, 64 insertions, 116 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
index 206018a9..e79d4888 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
@@ -38,57 +38,36 @@ import org.slf4j.LoggerFactory;
* DMAAP Topic Sink Factory.
*/
public interface DmaapTopicSinkFactory {
- public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
- public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
- public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
- public final String DME2_VERSION_PROPERTY = "Version";
- public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
- public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
- public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
- public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
+ String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
+ String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
+ String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
+ String DME2_VERSION_PROPERTY = "Version";
+ String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
+ String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
+ String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
+ String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
/**
- * Instantiates a new DMAAP Topic Sink.
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName AAF user name
- * @param password AAF password
- * @param partitionKey Consumer Group
- * @param environment DME2 environment
- * @param aftEnvironment DME2 AFT environment
- * @param partner DME2 Partner
- * @param latitude DME2 latitude
- * @param longitude DME2 longitude
- * @param additionalProps additional properties to pass to DME2
- * @param managed is this sink endpoint managed?
- * @return an DMAAP Topic Sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, String environment, String aftEnvironment,
- String partner, String latitude, String longitude, Map<String, String> additionalProps,
- boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
-
- /**
- * Instantiates a new DMAAP Topic Sink.
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName AAF user name
- * @param password AAF password
- * @param partitionKey Consumer Group
- * @param managed is this sink endpoint managed?
- * @return an DMAAP Topic Sink
+ * Instantiate a new DMAAP Topic Sink, with following params.
+ * servers list of servers
+ * topic topic name
+ * apiKey API Key
+ * apiSecret API Secret
+ * userName AAF user name
+ * password AAF password
+ * partitionKey Consumer Group
+ * environment DME2 environment
+ * aftEnvironment DME2 AFT environment
+ * partner DME2 Partner
+ * latitude DME2 latitude
+ * longitude DME2 longitude
+ * additionalProps additional properties to pass to DME2
+ * managed is this sink endpoint managed?
+ * @param busTopicParams parameter object
+ * @return DmaapTopicSink object
* @throws IllegalArgumentException if invalid parameters are present
*/
- public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, boolean managed,
- boolean useHttps, boolean allowSelfSignedCerts);
+ DmaapTopicSink build(BusTopicParams busTopicParams);
/**
* Creates an DMAAP Topic Sink based on properties files.
@@ -97,7 +76,7 @@ public interface DmaapTopicSinkFactory {
* @return an DMAAP Topic Sink
* @throws IllegalArgumentException if invalid parameters are present
*/
- public List<DmaapTopicSink> build(Properties properties);
+ List<DmaapTopicSink> build(Properties properties);
/**
* Instantiates a new DMAAP Topic Sink.
@@ -107,7 +86,7 @@ public interface DmaapTopicSinkFactory {
* @return an DMAAP Topic Sink
* @throws IllegalArgumentException if invalid parameters are present
*/
- public DmaapTopicSink build(List<String> servers, String topic);
+ DmaapTopicSink build(List<String> servers, String topic);
/**
* Destroys an DMAAP Topic Sink based on a topic.
@@ -115,12 +94,12 @@ public interface DmaapTopicSinkFactory {
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
- public void destroy(String topic);
+ void destroy(String topic);
/**
* Destroys all DMAAP Topic Sinks.
*/
- public void destroy();
+ void destroy();
/**
* Gets an DMAAP Topic Sink based on topic name.
@@ -130,14 +109,14 @@ public interface DmaapTopicSinkFactory {
* @throws IllegalArgumentException if an invalid topic is provided
* @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state
*/
- public DmaapTopicSink get(String topic);
+ DmaapTopicSink get(String topic);
/**
* Provides a snapshot of the DMAAP Topic Sinks.
*
* @return a list of the DMAAP Topic Sinks
*/
- public List<DmaapTopicSink> inventory();
+ List<DmaapTopicSink> inventory();
}
@@ -160,73 +139,21 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>();
@Override
- public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, String environment, String aftEnvironment,
- String partner, String latitude, String longitude, Map<String, String> additionalProps,
- boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
+ public DmaapTopicSink build(BusTopicParams busTopicParams){
- if (topic == null || topic.isEmpty()) {
+ if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
throw new IllegalArgumentException(MISSING_TOPIC);
}
synchronized (this) {
- if (dmaapTopicWriters.containsKey(topic)) {
- return dmaapTopicWriters.get(topic);
- }
-
- DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .userName(userName)
- .password(password)
- .partitionId(partitionKey)
- .environment(environment)
- .aftEnvironment(aftEnvironment)
- .partner(partner)
- .latitude(latitude)
- .longitude(longitude)
- .additionalProps(additionalProps)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts)
- .build());
-
- if (managed) {
- dmaapTopicWriters.put(topic, dmaapTopicSink);
+ if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) {
+ return dmaapTopicWriters.get(busTopicParams.getTopic());
}
- return dmaapTopicSink;
- }
- }
-
- @Override
- public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, boolean managed, boolean useHttps,
- boolean allowSelfSignedCerts) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
- synchronized (this) {
- if (dmaapTopicWriters.containsKey(topic)) {
- return dmaapTopicWriters.get(topic);
- }
+ DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(busTopicParams);
- DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .userName(userName)
- .password(password)
- .partitionId(partitionKey)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts)
- .build());
-
- if (managed) {
- dmaapTopicWriters.put(topic, dmaapTopicSink);
+ if (busTopicParams.isManaged()) {
+ dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink);
}
return dmaapTopicSink;
}
@@ -234,7 +161,13 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
@Override
public DmaapTopicSink build(List<String> servers, String topic) {
- return this.build(servers, topic, null, null, null, null, null, true, false, false);
+ return this.build(BusTopicParams.builder()
+ .servers(servers)
+ .topic(topic)
+ .managed(true)
+ .useHttps(false)
+ .allowSelfSignedCerts(false)
+ .build());
}
@Override
@@ -374,9 +307,24 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
}
- DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword,
- partitionKey, dme2Environment, dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude,
- dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
+ DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder()
+ .servers(serverList)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .userName(aafMechId)
+ .password(aafPassword)
+ .partitionId(partitionKey)
+ .environment(dme2Environment)
+ .aftEnvironment(dme2AftEnvironment)
+ .partner(dme2Partner)
+ .latitude(dme2Latitude)
+ .longitude(dme2Longitude)
+ .additionalProps(dme2AdditionalProps)
+ .managed(managed)
+ .useHttps(useHttps)
+ .allowSelfSignedCerts(allowSelfSignedCerts)
+ .build());
newDmaapTopicSinks.add(dmaapTopicSink);
}