aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java215
1 files changed, 70 insertions, 145 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
index cc31c2a5..41611f4e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
@@ -38,14 +38,14 @@ import org.slf4j.LoggerFactory;
* DMAAP Topic Source Factory.
*/
public interface DmaapTopicSourceFactory {
- public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
- public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
- public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
- public final String DME2_VERSION_PROPERTY = "Version";
- public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
- public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
- public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
- public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
+ String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
+ String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
+ String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
+ String DME2_VERSION_PROPERTY = "Version";
+ String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
+ String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
+ String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
+ String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
/**
* Creates an DMAAP Topic Source based on properties files.
@@ -55,64 +55,29 @@ public interface DmaapTopicSourceFactory {
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public List<DmaapTopicSource> build(Properties properties);
+ List<DmaapTopicSource> build(Properties properties);
/**
* Instantiates a new DMAAP Topic Source.
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName user name
- * @param password password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Read Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @param managed is this endpoind managed?
- * @param useHttps does the connection use HTTPS?
- * @param allowSelfSignedCerts does connection allow self-signed certificates?
- *
- * @return an DMAAP Topic Source
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit,
- boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
-
- /**
- * Instantiates a new DMAAP Topic Source.
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName user name
- * @param password password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Read Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @param environment DME2 environment
- * @param aftEnvironment DME2 AFT environment
- * @param partner DME2 Partner
- * @param latitude DME2 latitude
- * @param longitude DME2 longitude
- * @param additionalProps additional properties to pass to DME2
- * @param managed is this endpoind managed?
- * @param useHttps does the connection use HTTPS?
- * @param allowSelfSignedCerts does connection allow self-signed certificates?
*
+ * servers list of servers
+ * topic topic name
+ * apiKey API Key
+ * apiSecret API Secret
+ * userName user name
+ * password password
+ * consumerGroup Consumer Group
+ * consumerInstance Consumer Instance
+ * fetchTimeout Read Fetch Timeout
+ * fetchLimit Fetch Limit
+ * managed is this endpoind managed?
+ * useHttps does the connection use HTTPS?
+ * allowSelfSignedCerts does connection allow self-signed certificates?
+ * @param busTopicParams parameter object
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit, String environment, String aftEnvironment, String partner,
- String latitude, String longitude, Map<String, String> additionalProps,
- boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
+ DmaapTopicSource build(BusTopicParams busTopicParams);
/**
* Instantiates a new DMAAP Topic Source.
@@ -125,7 +90,7 @@ public interface DmaapTopicSourceFactory {
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
+ DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
/**
* Instantiates a new DMAAP Topic Source.
@@ -136,7 +101,7 @@ public interface DmaapTopicSourceFactory {
* @return an DMAAP Topic Source
* @throws IllegalArgumentException if invalid parameters are present
*/
- public DmaapTopicSource build(List<String> servers, String topic);
+ DmaapTopicSource build(List<String> servers, String topic);
/**
* Destroys an DMAAP Topic Source based on a topic.
@@ -144,12 +109,12 @@ public interface DmaapTopicSourceFactory {
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
- public void destroy(String topic);
+ void destroy(String topic);
/**
* Destroys all DMAAP Topic Sources.
*/
- public void destroy();
+ void destroy();
/**
* Gets an DMAAP Topic Source based on topic name.
@@ -159,14 +124,14 @@ public interface DmaapTopicSourceFactory {
* @throws IllegalArgumentException if an invalid topic is provided
* @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
*/
- public DmaapTopicSource get(String topic);
+ DmaapTopicSource get(String topic);
/**
* Provides a snapshot of the DMAAP Topic Sources.
*
* @return a list of the DMAAP Topic Sources
*/
- public List<DmaapTopicSource> inventory();
+ List<DmaapTopicSource> inventory();
}
@@ -189,94 +154,28 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
*/
protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
- /**
- * {@inheritDoc}
- */
- @Override
- public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
- String environment, String aftEnvironment, String partner, String latitude, String longitude,
- Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException(MISSING_TOPIC);
- }
-
- synchronized (this) {
- if (dmaapTopicSources.containsKey(topic)) {
- return dmaapTopicSources.get(topic);
- }
-
- DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .userName(userName)
- .password(password)
- .consumerGroup(consumerGroup)
- .consumerInstance(consumerInstance)
- .fetchTimeout(fetchTimeout)
- .fetchLimit(fetchLimit)
- .environment(environment)
- .aftEnvironment(aftEnvironment)
- .partner(partner)
- .latitude(latitude)
- .longitude(longitude)
- .additionalProps(additionalProps)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts)
- .build());
-
- if (managed) {
- dmaapTopicSources.put(topic, dmaapTopicSource);
- }
-
- return dmaapTopicSource;
- }
- }
/**
* {@inheritDoc}
*/
@Override
- public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
- boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
-
- if (servers == null || servers.isEmpty()) {
- throw new IllegalArgumentException("DMaaP Server(s) must be provided");
- }
+ public DmaapTopicSource build(BusTopicParams busTopicParams) {
- if (topic == null || topic.isEmpty()) {
+ if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
throw new IllegalArgumentException(MISSING_TOPIC);
}
synchronized (this) {
- if (dmaapTopicSources.containsKey(topic)) {
- return dmaapTopicSources.get(topic);
+ if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) {
+ return dmaapTopicSources.get(busTopicParams.getTopic());
}
DmaapTopicSource dmaapTopicSource =
- new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
- .servers(servers)
- .topic(topic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .userName(userName)
- .password(password)
- .consumerGroup(consumerGroup)
- .consumerInstance(consumerInstance)
- .fetchTimeout(fetchTimeout)
- .fetchLimit(fetchLimit)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts)
- .build());
-
- if (managed) {
- dmaapTopicSources.put(topic, dmaapTopicSource);
- }
+ new SingleThreadedDmaapTopicSource(busTopicParams);
+ if (busTopicParams.isManaged()) {
+ dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource);
+ }
return dmaapTopicSource;
}
}
@@ -454,10 +353,27 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
}
- DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId,
- aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment,
- dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed,
- useHttps, allowSelfSignedCerts);
+ DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder()
+ .servers(serverList)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .userName(aafMechId)
+ .password(aafPassword)
+ .consumerGroup(consumerGroup)
+ .consumerInstance(consumerInstance)
+ .fetchTimeout(fetchTimeout)
+ .fetchLimit(fetchLimit)
+ .environment(dme2Environment)
+ .aftEnvironment(dme2AftEnvironment)
+ .partner(dme2Partner)
+ .latitude(dme2Latitude)
+ .longitude(dme2Longitude)
+ .additionalProps(dme2AdditionalProps)
+ .managed(managed)
+ .useHttps(useHttps)
+ .allowSelfSignedCerts(allowSelfSignedCerts)
+ .build());
dmaapTopicSourceLst.add(uebTopicSource);
}
@@ -472,8 +388,17 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
*/
@Override
public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
- return this.build(servers, topic, apiKey, apiSecret, null, null, null, null,
- DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false);
+ return this.build(BusTopicParams.builder()
+ .servers(servers)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
+ .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH)
+ .managed(true)
+ .useHttps(false)
+ .allowSelfSignedCerts(false)
+ .build());
}
/**