diff options
Diffstat (limited to 'policy-endpoints')
5 files changed, 307 insertions, 124 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java index 96ab6c63..4285b3a9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; @@ -76,12 +77,12 @@ public interface DmaapTopicSourceFactory { * @throws IllegalArgumentException if invalid parameters are present */ public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, - boolean managed, boolean useHttps, boolean allowSelfSignedCerts); + String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, + boolean managed, boolean useHttps, boolean allowSelfSignedCerts); /** * Instantiates a new DMAAP Topic Source - * + * * @param servers list of servers * @param topic topic name * @param apiKey API Key @@ -101,14 +102,14 @@ public interface DmaapTopicSourceFactory { * @param managed is this endpoind managed? * @param useHttps does the connection use HTTPS? * @param allowSelfSignedCerts does connection allow self-signed certificates? - * + * * @return an DMAAP Topic Source * @throws IllegalArgumentException if invalid parameters are present */ public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, - String environment, String aftEnvironment, String partner, String latitude, String longitude, - Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts); + String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, + String environment, String aftEnvironment, String partner, String latitude, String longitude, + Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts); /** * Instantiates a new DMAAP Topic Source @@ -203,9 +204,26 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { return dmaapTopicSources.get(topic); } - DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, - userName, password, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, environment, - aftEnvironment, partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts); + DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder() + .servers(servers) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .userName(userName) + .password(password) + .consumerGroup(consumerGroup) + .consumerInstance(consumerInstance) + .fetchTimeout(fetchTimeout) + .fetchLimit(fetchLimit) + .environment(environment) + .aftEnvironment(aftEnvironment) + .partner(partner) + .latitude(latitude) + .longitude(longitude) + .additionalProps(additionalProps) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); if (managed) { dmaapTopicSources.put(topic, dmaapTopicSource); @@ -237,8 +255,20 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { } DmaapTopicSource dmaapTopicSource = - new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, userName, password, - consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); + new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder() + .servers(servers) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .userName(userName) + .password(password) + .consumerGroup(consumerGroup) + .consumerInstance(consumerInstance) + .fetchTimeout(fetchTimeout) + .fetchLimit(fetchLimit) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); if (managed) { dmaapTopicSources.put(topic, dmaapTopicSource); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java index c6cf3095..4c3cbbf8 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; @@ -63,8 +64,8 @@ public interface UebTopicSourceFactory { * @throws IllegalArgumentException if invalid parameters are present */ public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed, - boolean useHttps, boolean allowSelfSignedCerts); + String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean managed, + boolean useHttps, boolean allowSelfSignedCerts); /** * Instantiates a new UEB Topic Source @@ -160,8 +161,18 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { return uebTopicSources.get(topic); } - UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(servers, topic, apiKey, apiSecret, - consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); + UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder() + .servers(servers) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .consumerGroup(consumerGroup) + .consumerInstance(consumerInstance) + .fetchTimeout(fetchTimeout) + .fetchLimit(fetchLimit) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); if (managed) { uebTopicSources.put(topic, uebTopicSource); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 97ebbd50..74912cae 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -22,6 +22,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.net.MalformedURLException; import java.util.List; +import java.util.Map; import java.util.UUID; import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource; @@ -79,46 +80,35 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase /** * - * @param servers Bus servers - * @param topic Bus Topic to be monitored - * @param apiKey Bus API Key (optional) - * @param apiSecret Bus API Secret (optional) - * @param consumerGroup Bus Reader Consumer Group - * @param consumerInstance Bus Reader Instance - * @param fetchTimeout Bus fetch timeout - * @param fetchLimit Bus fetch limit - * @param useHttps does the bus use https - * @param allowSelfSignedCerts are self-signed certificates allowed - * @throws IllegalArgumentException An invalid parameter passed in + * + * @param busTopicParams@throws IllegalArgumentException An invalid parameter passed in */ - public SingleThreadedBusTopicSource(List<String> servers, String topic, String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, - boolean allowSelfSignedCerts) { + public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) { - super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); + super(busTopicParams.getServers(), busTopicParams.getTopic(), busTopicParams.getApiKey(), busTopicParams.getApiSecret(), busTopicParams.isUseHttps(), busTopicParams.isAllowSelfSignedCerts()); - if (consumerGroup == null || consumerGroup.isEmpty()) { + if (busTopicParams.getConsumerGroup() == null || busTopicParams.getConsumerGroup().isEmpty()) { this.consumerGroup = UUID.randomUUID().toString(); } else { - this.consumerGroup = consumerGroup; + this.consumerGroup = busTopicParams.getConsumerGroup(); } - if (consumerInstance == null || consumerInstance.isEmpty()) { + if (busTopicParams.getConsumerInstance() == null || busTopicParams.getConsumerInstance().isEmpty()) { this.consumerInstance = NetworkUtil.getHostname(); } else { - this.consumerInstance = consumerInstance; + this.consumerInstance = busTopicParams.getConsumerInstance(); } - if (fetchTimeout <= 0) { + if (busTopicParams.getFetchTimeout() <= 0) { this.fetchTimeout = NO_TIMEOUT_MS_FETCH; } else { - this.fetchTimeout = fetchTimeout; + this.fetchTimeout = busTopicParams.getFetchTimeout(); } - if (fetchLimit <= 0) { + if (busTopicParams.getFetchLimit() <= 0) { this.fetchLimit = NO_LIMIT_FETCH; } else { - this.fetchLimit = fetchLimit; + this.fetchLimit = busTopicParams.getFetchLimit(); } } @@ -322,4 +312,225 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase return fetchLimit; } + /** + * Member variables of this Params class are as follows + * servers DMaaP servers + * topic DMaaP Topic to be monitored + * apiKey DMaaP API Key (optional) + * apiSecret DMaaP API Secret (optional) + * consumerGroup DMaaP Reader Consumer Group + * consumerInstance DMaaP Reader Instance + * fetchTimeout DMaaP fetch timeout + * fetchLimit DMaaP fetch limit + * environment DME2 Environment + * aftEnvironment DME2 AFT Environment + * partner DME2 Partner + * latitude DME2 Latitude + * longitude DME2 Longitude + * additionalProps Additional properties to pass to DME2 + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow + * + */ + public static class BusTopicParams { + + public static TopicParamsBuilder builder() { + return new TopicParamsBuilder(); + } + private List<String> servers; + private String topic; + private String apiKey; + private String apiSecret; + private String consumerGroup; + private String consumerInstance; + private int fetchTimeout; + private int fetchLimit; + private boolean useHttps; + private boolean allowSelfSignedCerts; + + private String userName; + private String password; + private String environment; + private String aftEnvironment; + private String partner; + private String latitude; + private String longitude; + private Map<String, String> additionalProps; + + public String getUserName() { + return userName; + } + + public String getPassword() { + return password; + } + + public String getEnvironment() { + return environment; + } + + public String getAftEnvironment() { + return aftEnvironment; + } + + public String getPartner() { + return partner; + } + + public String getLatitude() { + return latitude; + } + + public String getLongitude() { + return longitude; + } + + public Map<String, String> getAdditionalProps() { + return additionalProps; + } + + public List<String> getServers() { + return servers; + } + + public String getTopic() { + return topic; + } + + public String getApiKey() { + return apiKey; + } + + public String getApiSecret() { + return apiSecret; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public String getConsumerInstance() { + return consumerInstance; + } + + public int getFetchTimeout() { + return fetchTimeout; + } + + public int getFetchLimit() { + return fetchLimit; + } + + public boolean isUseHttps() { + return useHttps; + } + + public boolean isAllowSelfSignedCerts() { + return allowSelfSignedCerts; + } + + + public static class TopicParamsBuilder { + BusTopicParams m = new BusTopicParams(); + + private TopicParamsBuilder() { + } + + public TopicParamsBuilder servers(List<String> servers) { + this.m.servers = servers; + return this; + } + + public TopicParamsBuilder topic(String topic) { + this.m.topic = topic; + return this; + } + + public TopicParamsBuilder apiKey(String apiKey) { + this.m.apiKey = apiKey; + return this; + } + + public TopicParamsBuilder apiSecret(String apiSecret) { + this.m.apiSecret = apiSecret; + return this; + } + + public TopicParamsBuilder consumerGroup(String consumerGroup) { + this.m.consumerGroup = consumerGroup; + return this; + } + + public TopicParamsBuilder consumerInstance(String consumerInstance) { + this.m.consumerInstance = consumerInstance; + return this; + } + + public TopicParamsBuilder fetchTimeout(int fetchTimeout) { + this.m.fetchTimeout = fetchTimeout; + return this; + } + + public TopicParamsBuilder fetchLimit(int fetchLimit) { + this.m.fetchLimit = fetchLimit; + return this; + } + + public TopicParamsBuilder useHttps(boolean useHttps) { + this.m.useHttps = useHttps; + return this; + } + + public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) { + this.m.allowSelfSignedCerts = allowSelfSignedCerts; + return this; + } + + public TopicParamsBuilder userName(String userName) { + this.m.userName = userName; + return this; + } + + public TopicParamsBuilder password(String password) { + this.m.password = password; + return this; + } + + public TopicParamsBuilder environment(String environment) { + this.m.environment = environment; + return this; + } + + public TopicParamsBuilder aftEnvironment(String aftEnvironment) { + this.m.aftEnvironment = aftEnvironment; + return this; + } + + public TopicParamsBuilder partner(String partner) { + this.m.partner = partner; + return this; + } + + public TopicParamsBuilder latitude(String latitude) { + this.m.latitude = latitude; + return this; + } + + public TopicParamsBuilder longitude(String longitude) { + this.m.longitude = longitude; + return this; + } + + public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) { + this.m.additionalProps = additionalProps; + return this; + } + + public BusTopicParams build() { + return m; + } + + } + + } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index 8ac41424..c6bd5568 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -21,7 +21,6 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.net.MalformedURLException; -import java.util.List; import java.util.Map; import org.onap.policy.common.endpoints.event.comm.Topic; @@ -52,81 +51,28 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource /** * - * @param servers DMaaP servers - * @param topic DMaaP Topic to be monitored - * @param apiKey DMaaP API Key (optional) - * @param apiSecret DMaaP API Secret (optional) - * @param consumerGroup DMaaP Reader Consumer Group - * @param consumerInstance DMaaP Reader Instance - * @param fetchTimeout DMaaP fetch timeout - * @param fetchLimit DMaaP fetch limit - * @param environment DME2 Environment - * @param aftEnvironment DME2 AFT Environment - * @param partner DME2 Partner - * @param latitude DME2 Latitude - * @param longitude DME2 Longitude - * @param additionalProps Additional properties to pass to DME2 - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * + * @param busTopicParams Parameters object containing all the required inputs * * @throws IllegalArgumentException An invalid parameter passed in */ - public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret, - String userName, String password, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, String environment, String aftEnvironment, String partner, String latitude, - String longitude, Map<String, String> additionalProps, boolean useHttps, boolean allowSelfSignedCerts) { - - super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, - allowSelfSignedCerts); - - this.userName = userName; - this.password = password; - - this.environment = environment; - this.aftEnvironment = aftEnvironment; - this.partner = partner; - - this.latitude = latitude; - this.longitude = longitude; - - this.additionalProps = additionalProps; - try { - this.init(); - } catch (Exception e) { - logger.error("ERROR during init of topic {}", this.topic); - throw new IllegalArgumentException(e); - } - } + public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) { - /** - * - * @param servers DMaaP servers - * @param topic DMaaP Topic to be monitored - * @param apiKey DMaaP API Key (optional) - * @param apiSecret DMaaP API Secret (optional) - * @param consumerGroup DMaaP Reader Consumer Group - * @param consumerInstance DMaaP Reader Instance - * @param fetchTimeout DMaaP fetch timeout - * @param fetchLimit DMaaP fetch limit - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * @throws IllegalArgumentException An invalid parameter passed in - */ - public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret, - String userName, String password, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) { + super(busTopicParams); + this.userName = busTopicParams.getUserName(); + this.password = busTopicParams.getPassword(); - super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, - allowSelfSignedCerts); + this.environment = busTopicParams.getEnvironment(); + this.aftEnvironment = busTopicParams.getAftEnvironment(); + this.partner = busTopicParams.getPartner(); - this.userName = userName; - this.password = password; + this.latitude = busTopicParams.getLatitude(); + this.longitude = busTopicParams.getLongitude(); + this.additionalProps = busTopicParams.getAdditionalProps(); try { this.init(); } catch (Exception e) { - logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e); + logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e); throw new IllegalArgumentException(e); } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java index b7a20503..03273a2b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java @@ -20,8 +20,6 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import java.util.List; - import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; @@ -33,29 +31,16 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i /** * - * @param servers UEB servers - * @param topic UEB Topic to be monitored - * @param apiKey UEB API Key (optional) - * @param apiSecret UEB API Secret (optional) - * @param consumerGroup UEB Reader Consumer Group - * @param consumerInstance UEB Reader Instance - * @param fetchTimeout UEB fetch timeout - * @param fetchLimit UEB fetch limit - * @param useHttps does topicSource use HTTPS? - * @param allowSelfSignedCerts does topicSource allow self-signed certs? - * + * @param busTopicParams Parameters object containing all the required inputs * * @throws IllegalArgumentException An invalid parameter passed in */ - public SingleThreadedUebTopicSource(List<String> servers, String topic, String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, - boolean allowSelfSignedCerts) { + public SingleThreadedUebTopicSource(BusTopicParams busTopicParams) { - super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, - allowSelfSignedCerts); + super(busTopicParams); - this.allowSelfSignedCerts = allowSelfSignedCerts; + this.allowSelfSignedCerts = busTopicParams.isAllowSelfSignedCerts(); this.init(); } |