diff options
Diffstat (limited to 'policy-endpoints')
6 files changed, 257 insertions, 369 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java index 206018a9..e79d4888 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java @@ -38,57 +38,36 @@ import org.slf4j.LoggerFactory; * DMAAP Topic Sink Factory. */ public interface DmaapTopicSinkFactory { - public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; - public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; - public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; - public final String DME2_VERSION_PROPERTY = "Version"; - public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; - public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; - public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; - public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; + String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; + String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; + String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; + String DME2_VERSION_PROPERTY = "Version"; + String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; + String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; + String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; + String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; /** - * Instantiates a new DMAAP Topic Sink. - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName AAF user name - * @param password AAF password - * @param partitionKey Consumer Group - * @param environment DME2 environment - * @param aftEnvironment DME2 AFT environment - * @param partner DME2 Partner - * @param latitude DME2 latitude - * @param longitude DME2 longitude - * @param additionalProps additional properties to pass to DME2 - * @param managed is this sink endpoint managed? - * @return an DMAAP Topic Sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, String environment, String aftEnvironment, - String partner, String latitude, String longitude, Map<String, String> additionalProps, - boolean managed, boolean useHttps, boolean allowSelfSignedCerts); - - /** - * Instantiates a new DMAAP Topic Sink. - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName AAF user name - * @param password AAF password - * @param partitionKey Consumer Group - * @param managed is this sink endpoint managed? - * @return an DMAAP Topic Sink + * Instantiate a new DMAAP Topic Sink, with following params. + * servers list of servers + * topic topic name + * apiKey API Key + * apiSecret API Secret + * userName AAF user name + * password AAF password + * partitionKey Consumer Group + * environment DME2 environment + * aftEnvironment DME2 AFT environment + * partner DME2 Partner + * latitude DME2 latitude + * longitude DME2 longitude + * additionalProps additional properties to pass to DME2 + * managed is this sink endpoint managed? + * @param busTopicParams parameter object + * @return DmaapTopicSink object * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, boolean managed, - boolean useHttps, boolean allowSelfSignedCerts); + DmaapTopicSink build(BusTopicParams busTopicParams); /** * Creates an DMAAP Topic Sink based on properties files. @@ -97,7 +76,7 @@ public interface DmaapTopicSinkFactory { * @return an DMAAP Topic Sink * @throws IllegalArgumentException if invalid parameters are present */ - public List<DmaapTopicSink> build(Properties properties); + List<DmaapTopicSink> build(Properties properties); /** * Instantiates a new DMAAP Topic Sink. @@ -107,7 +86,7 @@ public interface DmaapTopicSinkFactory { * @return an DMAAP Topic Sink * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSink build(List<String> servers, String topic); + DmaapTopicSink build(List<String> servers, String topic); /** * Destroys an DMAAP Topic Sink based on a topic. @@ -115,12 +94,12 @@ public interface DmaapTopicSinkFactory { * @param topic topic name * @throws IllegalArgumentException if invalid parameters are present */ - public void destroy(String topic); + void destroy(String topic); /** * Destroys all DMAAP Topic Sinks. */ - public void destroy(); + void destroy(); /** * Gets an DMAAP Topic Sink based on topic name. @@ -130,14 +109,14 @@ public interface DmaapTopicSinkFactory { * @throws IllegalArgumentException if an invalid topic is provided * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state */ - public DmaapTopicSink get(String topic); + DmaapTopicSink get(String topic); /** * Provides a snapshot of the DMAAP Topic Sinks. * * @return a list of the DMAAP Topic Sinks */ - public List<DmaapTopicSink> inventory(); + List<DmaapTopicSink> inventory(); } @@ -160,73 +139,21 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>(); @Override - public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, String environment, String aftEnvironment, - String partner, String latitude, String longitude, Map<String, String> additionalProps, - boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { + public DmaapTopicSink build(BusTopicParams busTopicParams){ - if (topic == null || topic.isEmpty()) { + if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { throw new IllegalArgumentException(MISSING_TOPIC); } synchronized (this) { - if (dmaapTopicWriters.containsKey(topic)) { - return dmaapTopicWriters.get(topic); - } - - DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .userName(userName) - .password(password) - .partitionId(partitionKey) - .environment(environment) - .aftEnvironment(aftEnvironment) - .partner(partner) - .latitude(latitude) - .longitude(longitude) - .additionalProps(additionalProps) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - if (managed) { - dmaapTopicWriters.put(topic, dmaapTopicSink); + if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) { + return dmaapTopicWriters.get(busTopicParams.getTopic()); } - return dmaapTopicSink; - } - } - - @Override - public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, boolean managed, boolean useHttps, - boolean allowSelfSignedCerts) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - synchronized (this) { - if (dmaapTopicWriters.containsKey(topic)) { - return dmaapTopicWriters.get(topic); - } + DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(busTopicParams); - DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .userName(userName) - .password(password) - .partitionId(partitionKey) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - if (managed) { - dmaapTopicWriters.put(topic, dmaapTopicSink); + if (busTopicParams.isManaged()) { + dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink); } return dmaapTopicSink; } @@ -234,7 +161,13 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { @Override public DmaapTopicSink build(List<String> servers, String topic) { - return this.build(servers, topic, null, null, null, null, null, true, false, false); + return this.build(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .managed(true) + .useHttps(false) + .allowSelfSignedCerts(false) + .build()); } @Override @@ -374,9 +307,24 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); } - DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword, - partitionKey, dme2Environment, dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, - dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts); + DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder() + .servers(serverList) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .userName(aafMechId) + .password(aafPassword) + .partitionId(partitionKey) + .environment(dme2Environment) + .aftEnvironment(dme2AftEnvironment) + .partner(dme2Partner) + .latitude(dme2Latitude) + .longitude(dme2Longitude) + .additionalProps(dme2AdditionalProps) + .managed(managed) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); newDmaapTopicSinks.add(dmaapTopicSink); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java index cc31c2a5..41611f4e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java @@ -38,14 +38,14 @@ import org.slf4j.LoggerFactory; * DMAAP Topic Source Factory. */ public interface DmaapTopicSourceFactory { - public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; - public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; - public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; - public final String DME2_VERSION_PROPERTY = "Version"; - public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; - public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; - public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; - public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; + String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; + String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; + String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; + String DME2_VERSION_PROPERTY = "Version"; + String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; + String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; + String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; + String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; /** * Creates an DMAAP Topic Source based on properties files. @@ -55,64 +55,29 @@ public interface DmaapTopicSourceFactory { * @return an DMAAP Topic Source * @throws IllegalArgumentException if invalid parameters are present */ - public List<DmaapTopicSource> build(Properties properties); + List<DmaapTopicSource> build(Properties properties); /** * Instantiates a new DMAAP Topic Source. - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName user name - * @param password password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Read Fetch Timeout - * @param fetchLimit Fetch Limit - * @param managed is this endpoind managed? - * @param useHttps does the connection use HTTPS? - * @param allowSelfSignedCerts does connection allow self-signed certificates? - * - * @return an DMAAP Topic Source - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, - boolean managed, boolean useHttps, boolean allowSelfSignedCerts); - - /** - * Instantiates a new DMAAP Topic Source. - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName user name - * @param password password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Read Fetch Timeout - * @param fetchLimit Fetch Limit - * @param environment DME2 environment - * @param aftEnvironment DME2 AFT environment - * @param partner DME2 Partner - * @param latitude DME2 latitude - * @param longitude DME2 longitude - * @param additionalProps additional properties to pass to DME2 - * @param managed is this endpoind managed? - * @param useHttps does the connection use HTTPS? - * @param allowSelfSignedCerts does connection allow self-signed certificates? * + * servers list of servers + * topic topic name + * apiKey API Key + * apiSecret API Secret + * userName user name + * password password + * consumerGroup Consumer Group + * consumerInstance Consumer Instance + * fetchTimeout Read Fetch Timeout + * fetchLimit Fetch Limit + * managed is this endpoind managed? + * useHttps does the connection use HTTPS? + * allowSelfSignedCerts does connection allow self-signed certificates? + * @param busTopicParams parameter object * @return an DMAAP Topic Source * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, String environment, String aftEnvironment, String partner, - String latitude, String longitude, Map<String, String> additionalProps, - boolean managed, boolean useHttps, boolean allowSelfSignedCerts); + DmaapTopicSource build(BusTopicParams busTopicParams); /** * Instantiates a new DMAAP Topic Source. @@ -125,7 +90,7 @@ public interface DmaapTopicSourceFactory { * @return an DMAAP Topic Source * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret); + DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret); /** * Instantiates a new DMAAP Topic Source. @@ -136,7 +101,7 @@ public interface DmaapTopicSourceFactory { * @return an DMAAP Topic Source * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSource build(List<String> servers, String topic); + DmaapTopicSource build(List<String> servers, String topic); /** * Destroys an DMAAP Topic Source based on a topic. @@ -144,12 +109,12 @@ public interface DmaapTopicSourceFactory { * @param topic topic name * @throws IllegalArgumentException if invalid parameters are present */ - public void destroy(String topic); + void destroy(String topic); /** * Destroys all DMAAP Topic Sources. */ - public void destroy(); + void destroy(); /** * Gets an DMAAP Topic Source based on topic name. @@ -159,14 +124,14 @@ public interface DmaapTopicSourceFactory { * @throws IllegalArgumentException if an invalid topic is provided * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state */ - public DmaapTopicSource get(String topic); + DmaapTopicSource get(String topic); /** * Provides a snapshot of the DMAAP Topic Sources. * * @return a list of the DMAAP Topic Sources */ - public List<DmaapTopicSource> inventory(); + List<DmaapTopicSource> inventory(); } @@ -189,94 +154,28 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { */ protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>(); - /** - * {@inheritDoc} - */ - @Override - public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, - String environment, String aftEnvironment, String partner, String latitude, String longitude, - Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicSources.containsKey(topic)) { - return dmaapTopicSources.get(topic); - } - - DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .userName(userName) - .password(password) - .consumerGroup(consumerGroup) - .consumerInstance(consumerInstance) - .fetchTimeout(fetchTimeout) - .fetchLimit(fetchLimit) - .environment(environment) - .aftEnvironment(aftEnvironment) - .partner(partner) - .latitude(latitude) - .longitude(longitude) - .additionalProps(additionalProps) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - if (managed) { - dmaapTopicSources.put(topic, dmaapTopicSource); - } - - return dmaapTopicSource; - } - } /** * {@inheritDoc} */ @Override - public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, - boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { - - if (servers == null || servers.isEmpty()) { - throw new IllegalArgumentException("DMaaP Server(s) must be provided"); - } + public DmaapTopicSource build(BusTopicParams busTopicParams) { - if (topic == null || topic.isEmpty()) { + if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { throw new IllegalArgumentException(MISSING_TOPIC); } synchronized (this) { - if (dmaapTopicSources.containsKey(topic)) { - return dmaapTopicSources.get(topic); + if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) { + return dmaapTopicSources.get(busTopicParams.getTopic()); } DmaapTopicSource dmaapTopicSource = - new SingleThreadedDmaapTopicSource(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .userName(userName) - .password(password) - .consumerGroup(consumerGroup) - .consumerInstance(consumerInstance) - .fetchTimeout(fetchTimeout) - .fetchLimit(fetchLimit) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - if (managed) { - dmaapTopicSources.put(topic, dmaapTopicSource); - } + new SingleThreadedDmaapTopicSource(busTopicParams); + if (busTopicParams.isManaged()) { + dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource); + } return dmaapTopicSource; } } @@ -454,10 +353,27 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { } - DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId, - aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment, - dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed, - useHttps, allowSelfSignedCerts); + DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder() + .servers(serverList) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .userName(aafMechId) + .password(aafPassword) + .consumerGroup(consumerGroup) + .consumerInstance(consumerInstance) + .fetchTimeout(fetchTimeout) + .fetchLimit(fetchLimit) + .environment(dme2Environment) + .aftEnvironment(dme2AftEnvironment) + .partner(dme2Partner) + .latitude(dme2Latitude) + .longitude(dme2Longitude) + .additionalProps(dme2AdditionalProps) + .managed(managed) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); dmaapTopicSourceLst.add(uebTopicSource); } @@ -472,8 +388,17 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { */ @Override public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) { - return this.build(servers, topic, apiKey, apiSecret, null, null, null, null, - DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false); + return this.build(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH) + .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH) + .managed(true) + .useHttps(false) + .allowSelfSignedCerts(false) + .build()); } /** diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java index adf79706..d418bfac 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkFactory.java @@ -43,7 +43,7 @@ public interface NoopTopicSinkFactory { * @return a noop topic sink * @throws IllegalArgumentException if invalid parameters are present */ - public List<NoopTopicSink> build(Properties properties); + List<NoopTopicSink> build(Properties properties); /** * builds a noop sink. @@ -54,7 +54,7 @@ public interface NoopTopicSinkFactory { * @return a noop topic sink * @throws IllegalArgumentException if invalid parameters are present */ - public NoopTopicSink build(List<String> servers, String topic, boolean managed); + NoopTopicSink build(List<String> servers, String topic, boolean managed); /** * Destroys a sink based on the topic. @@ -62,12 +62,12 @@ public interface NoopTopicSinkFactory { * @param topic topic name * @throws IllegalArgumentException if invalid parameters are present */ - public void destroy(String topic); + void destroy(String topic); /** * Destroys all sinks. */ - public void destroy(); + void destroy(); /** * gets a sink based on topic name. @@ -78,14 +78,14 @@ public interface NoopTopicSinkFactory { * @throws IllegalArgumentException if an invalid topic is provided * @throws IllegalStateException if the sink is in an incorrect state */ - public NoopTopicSink get(String topic); + NoopTopicSink get(String topic); /** * Provides a snapshot of the UEB Topic Writers. * * @return a list of the UEB Topic Writers */ - public List<NoopTopicSink> inventory(); + List<NoopTopicSink> inventory(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java index 2d2e1369..faa8e342 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java @@ -41,18 +41,17 @@ public interface UebTopicSinkFactory { /** * Instantiates a new UEB Topic Writer. * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param partitionKey Consumer Group - * @param managed is this sink endpoint managed? - * + * servers list of servers + * topic topic name + * apiKey API Key + * apiSecret API Secret + * partitionKey Consumer Group + * managed is this sink endpoint managed? + * @param busTopicParams parameter object * @return an UEB Topic Sink * @throws IllegalArgumentException if invalid parameters are present */ - public UebTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String partitionKey, - boolean managed, boolean useHttps, boolean allowSelfSignedCerts); + UebTopicSink build(BusTopicParams busTopicParams); /** * Creates an UEB Topic Writer based on properties files. @@ -62,7 +61,7 @@ public interface UebTopicSinkFactory { * @return an UEB Topic Writer * @throws IllegalArgumentException if invalid parameters are present */ - public List<UebTopicSink> build(Properties properties); + List<UebTopicSink> build(Properties properties); /** * Instantiates a new UEB Topic Writer. @@ -73,7 +72,7 @@ public interface UebTopicSinkFactory { * @return an UEB Topic Writer * @throws IllegalArgumentException if invalid parameters are present */ - public UebTopicSink build(List<String> servers, String topic); + UebTopicSink build(List<String> servers, String topic); /** * Destroys an UEB Topic Writer based on a topic. @@ -81,12 +80,12 @@ public interface UebTopicSinkFactory { * @param topic topic name * @throws IllegalArgumentException if invalid parameters are present */ - public void destroy(String topic); + void destroy(String topic); /** * Destroys all UEB Topic Writers. */ - public void destroy(); + void destroy(); /** * gets an UEB Topic Writer based on topic name. @@ -97,14 +96,14 @@ public interface UebTopicSinkFactory { * @throws IllegalArgumentException if an invalid topic is provided * @throws IllegalStateException if the UEB Topic Reader is an incorrect state */ - public UebTopicSink get(String topic); + UebTopicSink get(String topic); /** * Provides a snapshot of the UEB Topic Writers. * * @return a list of the UEB Topic Writers */ - public List<UebTopicSink> inventory(); + List<UebTopicSink> inventory(); } @@ -127,34 +126,25 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>(); @Override - public UebTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String partitionKey, - boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { + public UebTopicSink build(BusTopicParams busTopicParams) { - if (servers == null || servers.isEmpty()) { + if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { throw new IllegalArgumentException("UEB Server(s) must be provided"); } - if (topic == null || topic.isEmpty()) { + if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { throw new IllegalArgumentException(MISSING_TOPIC); } synchronized (this) { - if (uebTopicSinks.containsKey(topic)) { - return uebTopicSinks.get(topic); + if (uebTopicSinks.containsKey(busTopicParams.getTopic())) { + return uebTopicSinks.get(busTopicParams.getTopic()); } - UebTopicSink uebTopicWriter = new InlineUebTopicSink(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .partitionId(partitionKey) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - if (managed) { - uebTopicSinks.put(topic, uebTopicWriter); + UebTopicSink uebTopicWriter = new InlineUebTopicSink(busTopicParams); + + if (busTopicParams.isManaged()) { + uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter); } return uebTopicWriter; @@ -164,7 +154,13 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { @Override public UebTopicSink build(List<String> servers, String topic) { - return this.build(servers, topic, null, null, null, true, false, false); + return this.build(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .managed(true) + .useHttps(false) + .allowSelfSignedCerts(false) + .build()); } @@ -229,8 +225,16 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); } - UebTopicSink uebTopicWriter = this.build(serverList, topic, apiKey, apiSecret, partitionKey, managed, - useHttps, allowSelfSignedCerts); + UebTopicSink uebTopicWriter = this.build(BusTopicParams.builder() + .servers(serverList) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .partitionId(partitionKey) + .managed(managed) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); newUebTopicSinks.add(uebTopicWriter); } return newUebTopicSinks; diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java index c4a69831..1245127a 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java @@ -46,28 +46,25 @@ public interface UebTopicSourceFactory { * @return an UEB Topic Source * @throws IllegalArgumentException if invalid parameters are present */ - public List<UebTopicSource> build(Properties properties); + List<UebTopicSource> build(Properties properties); /** * Instantiates a new UEB Topic Source. * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Read Fetch Timeout - * @param fetchLimit Fetch Limit - * @param managed is this source endpoint managed? - * + * servers list of servers + * topic topic name + * apiKey API Key + * apiSecret API Secret + * consumerGroup Consumer Group + * consumerInstance Consumer Instance + * fetchTimeout Read Fetch Timeout + * fetchLimit Fetch Limit + * managed is this source endpoint managed? + * @param busTopicParams parameters object * @return an UEB Topic Source * @throws IllegalArgumentException if invalid parameters are present */ - public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean managed, - boolean useHttps, boolean allowSelfSignedCerts); + UebTopicSource build(BusTopicParams busTopicParams); /** * Instantiates a new UEB Topic Source. @@ -80,7 +77,7 @@ public interface UebTopicSourceFactory { * @return an UEB Topic Source * @throws IllegalArgumentException if invalid parameters are present */ - public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret); + UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret); /** * Instantiates a new UEB Topic Source. @@ -91,7 +88,7 @@ public interface UebTopicSourceFactory { * @return an UEB Topic Source * @throws IllegalArgumentException if invalid parameters are present */ - public UebTopicSource build(List<String> servers, String topic); + UebTopicSource build(List<String> servers, String topic); /** * Destroys an UEB Topic Source based on a topic. @@ -99,12 +96,12 @@ public interface UebTopicSourceFactory { * @param topic topic name * @throws IllegalArgumentException if invalid parameters are present */ - public void destroy(String topic); + void destroy(String topic); /** * Destroys all UEB Topic Sources. */ - public void destroy(); + void destroy(); /** * Gets an UEB Topic Source based on topic name. @@ -114,14 +111,14 @@ public interface UebTopicSourceFactory { * @throws IllegalArgumentException if an invalid topic is provided * @throws IllegalStateException if the UEB Topic Source is an incorrect state */ - public UebTopicSource get(String topic); + UebTopicSource get(String topic); /** * Provides a snapshot of the UEB Topic Sources. * * @return a list of the UEB Topic Sources */ - public List<UebTopicSource> inventory(); + List<UebTopicSource> inventory(); } @@ -147,37 +144,24 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { * {@inheritDoc} */ @Override - public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed, - boolean useHttps, boolean allowSelfSignedCerts) { - if (servers == null || servers.isEmpty()) { + public UebTopicSource build(BusTopicParams busTopicParams) { + if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { throw new IllegalArgumentException("UEB Server(s) must be provided"); } - if (topic == null || topic.isEmpty()) { + if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { throw new IllegalArgumentException(MISSING_TOPIC); } synchronized (this) { - if (uebTopicSources.containsKey(topic)) { - return uebTopicSources.get(topic); + if (uebTopicSources.containsKey(busTopicParams.getTopic())) { + return uebTopicSources.get(busTopicParams.getTopic()); } - UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .consumerGroup(consumerGroup) - .consumerInstance(consumerInstance) - .fetchTimeout(fetchTimeout) - .fetchLimit(fetchLimit) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - if (managed) { - uebTopicSources.put(topic, uebTopicSource); + UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(busTopicParams); + + if (busTopicParams.isManaged()) { + uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource); } return uebTopicSource; @@ -277,8 +261,18 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); } - UebTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, consumerGroup, - consumerInstance, fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts); + UebTopicSource uebTopicSource = this.build(BusTopicParams.builder() + .servers(serverList) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .consumerGroup(consumerGroup) + .consumerInstance(consumerInstance) + .fetchTimeout(fetchTimeout) + .fetchLimit(fetchLimit) + .managed(managed) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts).build()); newUebTopicSources.add(uebTopicSource); } } @@ -291,8 +285,16 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { @Override public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) { - return this.build(servers, topic, apiKey, apiSecret, null, null, UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH, - UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true); + return this.build(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .fetchTimeout(UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH) + .fetchLimit(UebTopicSource.DEFAULT_LIMIT_FETCH) + .managed(true) + .useHttps(false) + .allowSelfSignedCerts(true).build()); } /** diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java index f9eca081..347ae42c 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java @@ -69,80 +69,81 @@ public class BusTopicParams { private String longitude; private Map<String, String> additionalProps; private String partitionId; + private boolean managed; - String getPartitionId() { + public String getPartitionId() { return partitionId; } - String getUserName() { + public String getUserName() { return userName; } - String getPassword() { + public String getPassword() { return password; } - String getEnvironment() { + public String getEnvironment() { return environment; } - String getAftEnvironment() { + public String getAftEnvironment() { return aftEnvironment; } - String getPartner() { + public String getPartner() { return partner; } - String getLatitude() { + public String getLatitude() { return latitude; } - String getLongitude() { + public String getLongitude() { return longitude; } - Map<String, String> getAdditionalProps() { + public Map<String, String> getAdditionalProps() { return additionalProps; } - List<String> getServers() { + public List<String> getServers() { return servers; } - String getTopic() { + public String getTopic() { return topic; } - String getApiKey() { + public String getApiKey() { return apiKey; } - String getApiSecret() { + public String getApiSecret() { return apiSecret; } - String getConsumerGroup() { + public String getConsumerGroup() { return consumerGroup; } - String getConsumerInstance() { + public String getConsumerInstance() { return consumerInstance; } - int getFetchTimeout() { + public int getFetchTimeout() { return fetchTimeout; } - int getFetchLimit() { + public int getFetchLimit() { return fetchLimit; } - boolean isUseHttps() { + public boolean isUseHttps() { return useHttps; } - boolean isAllowSelfSignedCerts() { + public boolean isAllowSelfSignedCerts() { return allowSelfSignedCerts; } @@ -207,6 +208,10 @@ public class BusTopicParams { return (partitionId == null || partitionId.trim().isEmpty()); } + public boolean isManaged() { + return managed; + } + public static class TopicParamsBuilder { BusTopicParams m = new BusTopicParams(); @@ -312,6 +317,10 @@ public class BusTopicParams { return m; } + public TopicParamsBuilder managed(boolean managed) { + this.m.managed = managed; + return this; + } } } |