diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java | 180 |
1 files changed, 64 insertions, 116 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java index 206018a9..e79d4888 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java @@ -38,57 +38,36 @@ import org.slf4j.LoggerFactory; * DMAAP Topic Sink Factory. */ public interface DmaapTopicSinkFactory { - public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; - public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; - public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; - public final String DME2_VERSION_PROPERTY = "Version"; - public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; - public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; - public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; - public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; + String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; + String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; + String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; + String DME2_VERSION_PROPERTY = "Version"; + String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; + String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; + String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; + String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; /** - * Instantiates a new DMAAP Topic Sink. - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName AAF user name - * @param password AAF password - * @param partitionKey Consumer Group - * @param environment DME2 environment - * @param aftEnvironment DME2 AFT environment - * @param partner DME2 Partner - * @param latitude DME2 latitude - * @param longitude DME2 longitude - * @param additionalProps additional properties to pass to DME2 - * @param managed is this sink endpoint managed? - * @return an DMAAP Topic Sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, String environment, String aftEnvironment, - String partner, String latitude, String longitude, Map<String, String> additionalProps, - boolean managed, boolean useHttps, boolean allowSelfSignedCerts); - - /** - * Instantiates a new DMAAP Topic Sink. - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName AAF user name - * @param password AAF password - * @param partitionKey Consumer Group - * @param managed is this sink endpoint managed? - * @return an DMAAP Topic Sink + * Instantiate a new DMAAP Topic Sink, with following params. + * servers list of servers + * topic topic name + * apiKey API Key + * apiSecret API Secret + * userName AAF user name + * password AAF password + * partitionKey Consumer Group + * environment DME2 environment + * aftEnvironment DME2 AFT environment + * partner DME2 Partner + * latitude DME2 latitude + * longitude DME2 longitude + * additionalProps additional properties to pass to DME2 + * managed is this sink endpoint managed? + * @param busTopicParams parameter object + * @return DmaapTopicSink object * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, boolean managed, - boolean useHttps, boolean allowSelfSignedCerts); + DmaapTopicSink build(BusTopicParams busTopicParams); /** * Creates an DMAAP Topic Sink based on properties files. @@ -97,7 +76,7 @@ public interface DmaapTopicSinkFactory { * @return an DMAAP Topic Sink * @throws IllegalArgumentException if invalid parameters are present */ - public List<DmaapTopicSink> build(Properties properties); + List<DmaapTopicSink> build(Properties properties); /** * Instantiates a new DMAAP Topic Sink. @@ -107,7 +86,7 @@ public interface DmaapTopicSinkFactory { * @return an DMAAP Topic Sink * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSink build(List<String> servers, String topic); + DmaapTopicSink build(List<String> servers, String topic); /** * Destroys an DMAAP Topic Sink based on a topic. @@ -115,12 +94,12 @@ public interface DmaapTopicSinkFactory { * @param topic topic name * @throws IllegalArgumentException if invalid parameters are present */ - public void destroy(String topic); + void destroy(String topic); /** * Destroys all DMAAP Topic Sinks. */ - public void destroy(); + void destroy(); /** * Gets an DMAAP Topic Sink based on topic name. @@ -130,14 +109,14 @@ public interface DmaapTopicSinkFactory { * @throws IllegalArgumentException if an invalid topic is provided * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state */ - public DmaapTopicSink get(String topic); + DmaapTopicSink get(String topic); /** * Provides a snapshot of the DMAAP Topic Sinks. * * @return a list of the DMAAP Topic Sinks */ - public List<DmaapTopicSink> inventory(); + List<DmaapTopicSink> inventory(); } @@ -160,73 +139,21 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>(); @Override - public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, String environment, String aftEnvironment, - String partner, String latitude, String longitude, Map<String, String> additionalProps, - boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { + public DmaapTopicSink build(BusTopicParams busTopicParams){ - if (topic == null || topic.isEmpty()) { + if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { throw new IllegalArgumentException(MISSING_TOPIC); } synchronized (this) { - if (dmaapTopicWriters.containsKey(topic)) { - return dmaapTopicWriters.get(topic); - } - - DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .userName(userName) - .password(password) - .partitionId(partitionKey) - .environment(environment) - .aftEnvironment(aftEnvironment) - .partner(partner) - .latitude(latitude) - .longitude(longitude) - .additionalProps(additionalProps) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - if (managed) { - dmaapTopicWriters.put(topic, dmaapTopicSink); + if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) { + return dmaapTopicWriters.get(busTopicParams.getTopic()); } - return dmaapTopicSink; - } - } - - @Override - public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, boolean managed, boolean useHttps, - boolean allowSelfSignedCerts) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - synchronized (this) { - if (dmaapTopicWriters.containsKey(topic)) { - return dmaapTopicWriters.get(topic); - } + DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(busTopicParams); - DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .userName(userName) - .password(password) - .partitionId(partitionKey) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - if (managed) { - dmaapTopicWriters.put(topic, dmaapTopicSink); + if (busTopicParams.isManaged()) { + dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink); } return dmaapTopicSink; } @@ -234,7 +161,13 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { @Override public DmaapTopicSink build(List<String> servers, String topic) { - return this.build(servers, topic, null, null, null, null, null, true, false, false); + return this.build(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .managed(true) + .useHttps(false) + .allowSelfSignedCerts(false) + .build()); } @Override @@ -374,9 +307,24 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); } - DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword, - partitionKey, dme2Environment, dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, - dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts); + DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder() + .servers(serverList) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .userName(aafMechId) + .password(aafPassword) + .partitionId(partitionKey) + .environment(dme2Environment) + .aftEnvironment(dme2AftEnvironment) + .partner(dme2Partner) + .latitude(dme2Latitude) + .longitude(dme2Longitude) + .additionalProps(dme2AdditionalProps) + .managed(managed) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); newDmaapTopicSinks.add(dmaapTopicSink); } |