diff options
15 files changed, 711 insertions, 538 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index e7a21ca1..4e2f4ecf 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -75,7 +75,7 @@ public interface TopicEndpoint extends Startable, Lockable { /** * get the Topic Sources for the given topic name * - * @param topicName the topic name + * @param topicNames the topic name * * @return the Topic Source List * @throws IllegalStateException if the entity is in an invalid state @@ -150,7 +150,6 @@ public interface TopicEndpoint extends Startable, Lockable { * infrastructure type * * @param topicName the topic name - * @param commType communication infrastructure type * * @return the Topic Sink List * @throws IllegalStateException if the entity is in an invalid state, for example multiple diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java index 26e8d413..08a1db8f 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java @@ -3,13 +3,14 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -27,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; @@ -47,53 +49,50 @@ public interface DmaapTopicSinkFactory { /** * Instantiates a new DMAAP Topic Sink - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName AAF user name - * @param password AAF password - * @param partitionKey Consumer Group - * @param environment DME2 environment - * @param aftEnvironment DME2 AFT environment - * @param partner DME2 Partner - * @param latitude DME2 latitude - * @param longitude DME2 longitude + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName AAF user name + * @param password AAF password + * @param partitionKey Consumer Group + * @param environment DME2 environment + * @param aftEnvironment DME2 AFT environment + * @param partner DME2 Partner + * @param latitude DME2 latitude + * @param longitude DME2 longitude * @param additionalProps additional properties to pass to DME2 - * @param managed is this sink endpoint managed? - * + * @param managed is this sink endpoint managed? * @return an DMAAP Topic Sink * @throws IllegalArgumentException if invalid parameters are present */ public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, String environment, String aftEnvironment, String partner, - String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps, - boolean allowSelfSignedCerts); + String password, String partitionKey, String environment, String aftEnvironment, String partner, + String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps, + boolean allowSelfSignedCerts); /** * Instantiates a new DMAAP Topic Sink - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName AAF user name - * @param password AAF password + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName AAF user name + * @param password AAF password * @param partitionKey Consumer Group - * @param managed is this sink endpoint managed? - * + * @param managed is this sink endpoint managed? * @return an DMAAP Topic Sink * @throws IllegalArgumentException if invalid parameters are present */ public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts); + String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts); /** * Creates an DMAAP Topic Sink based on properties files - * + * * @param properties Properties containing initialization values - * * @return an DMAAP Topic Sink * @throws IllegalArgumentException if invalid parameters are present */ @@ -101,10 +100,9 @@ public interface DmaapTopicSinkFactory { /** * Instantiates a new DMAAP Topic Sink - * + * * @param servers list of servers - * @param topic topic name - * + * @param topic topic name * @return an DMAAP Topic Sink * @throws IllegalArgumentException if invalid parameters are present */ @@ -112,7 +110,7 @@ public interface DmaapTopicSinkFactory { /** * Destroys an DMAAP Topic Sink based on a topic - * + * * @param topic topic name * @throws IllegalArgumentException if invalid parameters are present */ @@ -120,18 +118,17 @@ public interface DmaapTopicSinkFactory { /** * gets an DMAAP Topic Sink based on topic name - * + * * @param topic the topic name - * * @return an DMAAP Topic Sink with topic name * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state + * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state */ public DmaapTopicSink get(String topic); /** * Provides a snapshot of the DMAAP Topic Sinks - * + * * @return a list of the DMAAP Topic Sinks */ public List<DmaapTopicSink> inventory(); @@ -163,9 +160,9 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { @Override public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, String environment, String aftEnvironment, String partner, - String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps, - boolean allowSelfSignedCerts) { + String password, String partitionKey, String environment, String aftEnvironment, String partner, + String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps, + boolean allowSelfSignedCerts) { if (topic == null || topic.isEmpty()) { throw new IllegalArgumentException(MISSING_TOPIC); @@ -176,9 +173,23 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { return dmaapTopicWriters.get(topic); } - DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName, - password, partitionKey, environment, aftEnvironment, partner, latitude, longitude, additionalProps, - useHttps, allowSelfSignedCerts); + DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .userName(userName) + .password(password) + .partitionId(partitionKey) + .environment(environment) + .aftEnvironment(aftEnvironment) + .partner(partner) + .latitude(latitude) + .longitude(longitude) + .additionalProps(additionalProps) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); if (managed) { dmaapTopicWriters.put(topic, dmaapTopicSink); @@ -189,7 +200,8 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { @Override public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { + String password, String partitionKey, boolean managed, boolean useHttps, + boolean allowSelfSignedCerts) { if (topic == null || topic.isEmpty()) { throw new IllegalArgumentException(MISSING_TOPIC); @@ -200,8 +212,17 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { return dmaapTopicWriters.get(topic); } - DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName, - password, partitionKey, useHttps, allowSelfSignedCerts); + DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .userName(userName) + .password(password) + .partitionId(partitionKey) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); if (managed) { dmaapTopicWriters.put(topic, dmaapTopicSink); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java index 4285b3a9..11dfd292 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; @@ -204,7 +205,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { return dmaapTopicSources.get(topic); } - DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder() + DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(BusTopicParams.builder() .servers(servers) .topic(topic) .apiKey(apiKey) @@ -255,7 +256,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { } DmaapTopicSource dmaapTopicSource = - new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder() + new SingleThreadedDmaapTopicSource(BusTopicParams.builder() .servers(servers) .topic(topic) .apiKey(apiKey) diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java index a522e2c5..9d1bd8ad 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; @@ -141,8 +143,15 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { return uebTopicSinks.get(topic); } - UebTopicSink uebTopicWriter = new InlineUebTopicSink(servers, topic, apiKey, apiSecret, partitionKey, - useHttps, allowSelfSignedCerts); + UebTopicSink uebTopicWriter = new InlineUebTopicSink(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .partitionId(partitionKey) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); if (managed) { uebTopicSinks.put(topic, uebTopicWriter); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java index 4c3cbbf8..8d3f28e9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Properties; -import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; @@ -161,7 +162,7 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { return uebTopicSources.get(topic); } - UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder() + UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(BusTopicParams.builder() .servers(servers) .topic(topic) .apiKey(apiKey) @@ -366,5 +367,4 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { builder.append("IndexedUebTopicSourceFactory []"); return builder.toString(); } - } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 636dc6e3..6d34d32b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,7 +33,6 @@ import java.io.IOException; import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Properties; @@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory; /** * Wrapper around libraries to consume from message bus - * */ public interface BusConsumer { @@ -67,7 +66,7 @@ public interface BusConsumer { /** * Sets the server-side filter. - * + * * @param filter new filter value, or {@code null} * @throws IllegalArgumentException if the consumer cannot be built with the new filter */ @@ -116,53 +115,47 @@ public interface BusConsumer { /** * Cambria Consumer Wrapper + * BusTopicParam object contains the following parameters + * servers messaging bus hosts + * topic topic + * apiKey API Key + * apiSecret API Secret + * consumerGroup Consumer Group + * consumerInstance Consumer Instance + * fetchTimeout Fetch Timeout + * fetchLimit Fetch Limit * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit + * @param busTopicParams * @throws GeneralSecurityException * @throws MalformedURLException */ - public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, - boolean useSelfSignedCerts) { - this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout, - fetchLimit, useHttps, useSelfSignedCerts); - } + public CambriaConsumerWrapper(BusTopicParams busTopicParams) { - public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { - - this.fetchTimeout = fetchTimeout; + this.fetchTimeout = busTopicParams.getFetchTimeout(); this.builder = new CambriaClientBuilders.ConsumerBuilder(); - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); + builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance()) + .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()) + .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit()); // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) builder.withSocketTimeout(fetchTimeout + 30000); - if (useHttps) { + if (busTopicParams.isUseHttps()) { builder.usingHttps(); - if (useSelfSignedCerts) { + if (busTopicParams.isAllowSelfSignedCerts()) { builder.allowSelfSignedCertificates(); } } - if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); + if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { + builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); } - if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { - builder.authenticatedByHttp(username, password); + if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { + builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); } try { @@ -282,34 +275,36 @@ public interface BusConsumer { /** * MR Consumer Wrapper + * <p> + * servers messaging bus hosts + * topic topic + * apiKey API Key + * apiSecret API Secret + * username AAF Login + * password AAF Password + * consumerGroup Consumer Group + * consumerInstance Consumer Instance + * fetchTimeout Fetch Timeout + * fetchLimit Fetch Limit * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param username AAF Login - * @param password AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit + * @param busTopicParams contains above listed attributes * @throws MalformedURLException */ - public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit) throws MalformedURLException { + public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - this.fetchTimeout = fetchTimeout; + this.fetchTimeout = busTopicParams.getFetchTimeout(); - if (topic == null || topic.isEmpty()) { + if (busTopicParams.isTopicNullOrEmpty()) { throw new IllegalArgumentException("No topic for DMaaP"); } - this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, - fetchLimit, null, apiKey, apiSecret); + this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(), + busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(), + busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null, + busTopicParams.getApiKey(), busTopicParams.getApiSecret()); - this.consumer.setUsername(username); - this.consumer.setPassword(password); + this.consumer.setUsername(busTopicParams.getUserName()); + this.consumer.setPassword(busTopicParams.getPassword()); } @Override @@ -374,29 +369,29 @@ public interface BusConsumer { private final Properties props; /** + * BusTopicParams contain the following parameters * MR Consumer Wrapper + * <p> + * servers messaging bus hosts + * topic topic + * apiKey API Key + * apiSecret API Secret + * aafLogin AAF Login + * aafPassword AAF Password + * consumerGroup Consumer Group + * consumerInstance Consumer Instance + * fetchTimeout Fetch Timeout + * fetchLimit Fetch Limit * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param aafLogin AAF Login - * @param aafPassword AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit + * @param busTopicParams contains above listed params * @throws MalformedURLException */ - public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - String aafLogin, String aafPassword, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, boolean useHttps) throws MalformedURLException { + public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + super(busTopicParams); // super constructor sets servers = {""} if empty to avoid errors when using DME2 - if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) || (servers.isEmpty())) { + if (busTopicParams.isServersNullOrEmpty()) { throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); } @@ -404,13 +399,13 @@ public interface BusConsumer { props = new Properties(); - if (useHttps) { + if (busTopicParams.isUseHttps()) { props.setProperty(PROTOCOL_PROP, "https"); - this.consumer.setHost(servers.get(0) + ":3905"); + this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905"); } else { props.setProperty(PROTOCOL_PROP, "http"); - this.consumer.setHost(servers.get(0) + ":3904"); + this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904"); } this.consumer.setProps(props); @@ -434,70 +429,72 @@ public interface BusConsumer { private final Properties props; - public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude, - String longitude, Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException { - + public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + super(busTopicParams); - final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + final String dme2RouteOffer = busTopicParams.getAdditionalProps() + .get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - if (environment == null || environment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + if (busTopicParams.isEnvironmentNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); } - if (aftEnvironment == null || aftEnvironment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + if (busTopicParams.isAftEnvironmentNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); } - if (latitude == null || latitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + if (busTopicParams.isLatitudeNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); } - if (longitude == null || longitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + if (busTopicParams.isLongitudeNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); } - if ((dme2Partner == null || dme2Partner.isEmpty()) + if ((busTopicParams.isPartnerNullOrEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + busTopicParams.getTopic() + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + busTopicParams.getTopic() + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); } - final String serviceName = servers.get(0); + final String serviceName = busTopicParams.getServers().get(0); this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - this.consumer.setUsername(dme2Login); - this.consumer.setPassword(dme2Password); + this.consumer.setUsername(busTopicParams.getUserName()); + this.consumer.setPassword(busTopicParams.getPassword()); props = new Properties(); props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); - props.setProperty("username", dme2Login); - props.setProperty("password", dme2Password); + props.setProperty("username", busTopicParams.getUserName()); + props.setProperty("password", busTopicParams.getPassword()); /* These are required, no defaults */ - props.setProperty("topic", topic); + props.setProperty("topic", busTopicParams.getTopic()); - props.setProperty("Environment", environment); - props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + props.setProperty("Environment", busTopicParams.getEnvironment()); + props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); - if (dme2Partner != null) { - props.setProperty("Partner", dme2Partner); + if (busTopicParams.getPartner() != null) { + props.setProperty("Partner", busTopicParams.getPartner()); } if (dme2RouteOffer != null) { props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); } - props.setProperty("Latitude", latitude); - props.setProperty("Longitude", longitude); + props.setProperty("Latitude", busTopicParams.getLatitude()); + props.setProperty("Longitude", busTopicParams.getLongitude()); /* These are optional, will default to these values if not set in additionalProps */ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); @@ -511,7 +508,7 @@ public interface BusConsumer { props.setProperty("TransportType", "DME2"); props.setProperty("MethodType", "GET"); - if (useHttps) { + if (busTopicParams.isUseHttps()) { props.setProperty(PROTOCOL_PROP, "https"); } else { @@ -520,8 +517,8 @@ public interface BusConsumer { props.setProperty("contenttype", "application/json"); - if (additionalProps != null) { - for (Map.Entry<String, String> entry : additionalProps.entrySet()) { + if (busTopicParams.isAdditionalPropsValid()) { + for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { props.put(entry.getKey(), entry.getValue()); } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 9db9131c..348100ab 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,7 +48,7 @@ public interface BusPublisher { /** * sends a message * - * @param partition id + * @param partitionId id * @param message the message * @return true if success, false otherwise * @throws IllegalArgumentException if no message provided @@ -72,23 +73,17 @@ public interface BusPublisher { @JsonIgnore protected volatile CambriaBatchingPublisher publisher; - public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - boolean useHttps) { - this(servers, topic, apiKey, apiSecret, null, null, useHttps, false); - } - - public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - String username, String password, boolean useHttps, boolean selfSignedCerts) { + public CambriaPublisherWrapper(BusTopicParams busTopicParams) { PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); - builder.usingHosts(servers).onTopic(topic); + builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()); // Set read timeout to 30 seconds (TBD: this should be configurable) builder.withSocketTimeout(30000); - if (useHttps) { - if (selfSignedCerts) { + if (busTopicParams.isUseHttps()) { + if (busTopicParams.isAllowSelfSignedCerts()) { builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION); } else { builder.withConnectionType(ConnectionType.HTTPS); @@ -96,12 +91,12 @@ public interface BusPublisher { } - if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); + if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { + builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); } - if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { - builder.authenticatedByHttp(username, password); + if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { + builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); } try { @@ -297,55 +292,60 @@ public interface BusPublisher { } public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { - public DmaapDmePublisherWrapper(List<String> servers, String topic, String username, String password, - String environment, String aftEnvironment, String dme2Partner, String latitude, String longitude, - Map<String, String> additionalProps, boolean useHttps) { - - super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps); - - - - String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) { + + super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(), + busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps()); + String dme2RouteOffer = null; + if (busTopicParams.isAdditionalPropsValid()) { + dme2RouteOffer = busTopicParams.getAdditionalProps().get( + DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + } - if (environment == null || environment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + if (busTopicParams.isEnvironmentNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); } - if (aftEnvironment == null || aftEnvironment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + if (busTopicParams.isAftEnvironmentNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); } - if (latitude == null || latitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + if (busTopicParams.isLatitudeNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); } - if (longitude == null || longitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + if (busTopicParams.isLongitudeNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); } - if ((dme2Partner == null || dme2Partner.isEmpty()) - && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { + if ((busTopicParams.isPartnerNullOrEmpty()) + && (dme2RouteOffer == null || dme2RouteOffer.trim().isEmpty())) { throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + busTopicParams.getTopic() + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic() + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); } - String serviceName = servers.get(0); + String serviceName = busTopicParams.getServers().get(0); /* These are required, no defaults */ - props.setProperty("Environment", environment); - props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + props.setProperty("Environment", busTopicParams.getEnvironment()); + props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); - if (dme2Partner != null) { - props.setProperty("Partner", dme2Partner); + if (busTopicParams.getPartner() != null) { + props.setProperty("Partner", busTopicParams.getPartner()); } if (dme2RouteOffer != null) { props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); } - props.setProperty("Latitude", latitude); - props.setProperty("Longitude", longitude); + props.setProperty("Latitude", busTopicParams.getLatitude()); + props.setProperty("Longitude", busTopicParams.getLongitude()); // ServiceName also a default, found in additionalProps @@ -361,7 +361,7 @@ public interface BusPublisher { props.setProperty("TransportType", "DME2"); props.setProperty("MethodType", "POST"); - for (Map.Entry<String, String> entry : additionalProps.entrySet()) { + for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { String key = entry.getKey(); String value = entry.getValue(); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java index 7f4c0ddd..08993126 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java @@ -52,25 +52,22 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { /** * Instantiates a new Bus Topic Base * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * + * servers list of servers + * topic topic name + * apiKey API Key + * apiSecret API Secret + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow + * @param busTopicParams * @return a Bus Topic Base * @throws IllegalArgumentException if invalid parameters are present */ - public BusTopicBase(List<String> servers, String topic, String apiKey, String apiSecret, boolean useHttps, - boolean allowSelfSignedCerts) { - - super(servers, topic); - - this.apiKey = apiKey; - this.apiSecret = apiSecret; - this.useHttps = useHttps; - this.allowSelfSignedCerts = allowSelfSignedCerts; + public BusTopicBase(BusTopicParams busTopicParams) { + super(busTopicParams.getServers(), busTopicParams.getTopic()); + this.apiKey = busTopicParams.getApiKey(); + this.apiSecret = busTopicParams.getApiSecret(); + this.useHttps = busTopicParams.isUseHttps(); + this.allowSelfSignedCerts = busTopicParams.isAllowSelfSignedCerts(); } @Override diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java new file mode 100644 index 00000000..ffefcbf2 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java @@ -0,0 +1,316 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import java.util.List; +import java.util.Map; + +/** + * Member variables of this Params class are as follows + * servers DMaaP servers + * topic DMaaP Topic to be monitored + * apiKey DMaaP API Key (optional) + * apiSecret DMaaP API Secret (optional) + * consumerGroup DMaaP Reader Consumer Group + * consumerInstance DMaaP Reader Instance + * fetchTimeout DMaaP fetch timeout + * fetchLimit DMaaP fetch limit + * environment DME2 Environment + * aftEnvironment DME2 AFT Environment + * partner DME2 Partner + * latitude DME2 Latitude + * longitude DME2 Longitude + * additionalProps Additional properties to pass to DME2 + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow + */ +public class BusTopicParams { + + public static TopicParamsBuilder builder() { + return new TopicParamsBuilder(); + } + + private List<String> servers; + private String topic; + private String apiKey; + private String apiSecret; + private String consumerGroup; + private String consumerInstance; + private int fetchTimeout; + private int fetchLimit; + private boolean useHttps; + private boolean allowSelfSignedCerts; + + private String userName; + private String password; + private String environment; + private String aftEnvironment; + private String partner; + private String latitude; + private String longitude; + private Map<String, String> additionalProps; + private String partitionId; + + String getPartitionId() { + return partitionId; + } + + String getUserName() { + return userName; + } + + String getPassword() { + return password; + } + + String getEnvironment() { + return environment; + } + + String getAftEnvironment() { + return aftEnvironment; + } + + String getPartner() { + return partner; + } + + String getLatitude() { + return latitude; + } + + String getLongitude() { + return longitude; + } + + Map<String, String> getAdditionalProps() { + return additionalProps; + } + + List<String> getServers() { + return servers; + } + + String getTopic() { + return topic; + } + + String getApiKey() { + return apiKey; + } + + String getApiSecret() { + return apiSecret; + } + + String getConsumerGroup() { + return consumerGroup; + } + + String getConsumerInstance() { + return consumerInstance; + } + + int getFetchTimeout() { + return fetchTimeout; + } + + int getFetchLimit() { + return fetchLimit; + } + + boolean isUseHttps() { + return useHttps; + } + + boolean isAllowSelfSignedCerts() { + return allowSelfSignedCerts; + } + + boolean isEnvironmentNullOrEmpty() { + return (environment == null || environment.trim().isEmpty()); + } + + boolean isAftEnvironmentNullOrEmpty() { + return (aftEnvironment == null || aftEnvironment.trim().isEmpty()); + } + + boolean isLatitudeNullOrEmpty() { + return (latitude == null || latitude.trim().isEmpty()); + } + + boolean isLongitudeNullOrEmpty() { + return (longitude == null || longitude.trim().isEmpty()); + } + + boolean isConsumerInstanceNullOrEmpty() { + return (consumerInstance == null || consumerInstance.trim().isEmpty()); + } + + boolean isConsumerGroupNullOrEmpty() { + return (consumerGroup == null || consumerGroup.trim().isEmpty()); + } + + boolean isApiKeyValid() { + return !(apiKey == null || apiKey.trim().isEmpty()); + } + + boolean isApiSecretValid() { + return !(apiSecret == null || apiSecret.trim().isEmpty()); + } + + boolean isUserNameValid() { + return !(userName == null || userName.trim().isEmpty()); + } + + boolean isPasswordValid() { + return !(password == null || password.trim().isEmpty()); + } + + boolean isPartnerNullOrEmpty() { + return (partner == null || partner.trim().isEmpty()); + } + + boolean isServersNullOrEmpty() { + return (servers == null || servers.isEmpty() + || (servers.size() == 1 && ("".equals(servers.get(0))))); + } + + boolean isAdditionalPropsValid() { + return additionalProps != null; + } + + boolean isTopicNullOrEmpty() { + return (topic == null || topic.trim().isEmpty()); + } + + boolean isPartitionIdNullOrEmpty() { + return (partitionId == null || partitionId.trim().isEmpty()); + } + + public static class TopicParamsBuilder { + BusTopicParams m = new BusTopicParams(); + + private TopicParamsBuilder() { + } + + public TopicParamsBuilder servers(List<String> servers) { + this.m.servers = servers; + return this; + } + + public TopicParamsBuilder topic(String topic) { + this.m.topic = topic; + return this; + } + + public TopicParamsBuilder apiKey(String apiKey) { + this.m.apiKey = apiKey; + return this; + } + + public TopicParamsBuilder apiSecret(String apiSecret) { + this.m.apiSecret = apiSecret; + return this; + } + + public TopicParamsBuilder consumerGroup(String consumerGroup) { + this.m.consumerGroup = consumerGroup; + return this; + } + + public TopicParamsBuilder consumerInstance(String consumerInstance) { + this.m.consumerInstance = consumerInstance; + return this; + } + + public TopicParamsBuilder fetchTimeout(int fetchTimeout) { + this.m.fetchTimeout = fetchTimeout; + return this; + } + + public TopicParamsBuilder fetchLimit(int fetchLimit) { + this.m.fetchLimit = fetchLimit; + return this; + } + + public TopicParamsBuilder useHttps(boolean useHttps) { + this.m.useHttps = useHttps; + return this; + } + + public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) { + this.m.allowSelfSignedCerts = allowSelfSignedCerts; + return this; + } + + public TopicParamsBuilder userName(String userName) { + this.m.userName = userName; + return this; + } + + public TopicParamsBuilder password(String password) { + this.m.password = password; + return this; + } + + public TopicParamsBuilder environment(String environment) { + this.m.environment = environment; + return this; + } + + public TopicParamsBuilder aftEnvironment(String aftEnvironment) { + this.m.aftEnvironment = aftEnvironment; + return this; + } + + public TopicParamsBuilder partner(String partner) { + this.m.partner = partner; + return this; + } + + public TopicParamsBuilder latitude(String latitude) { + this.m.latitude = latitude; + return this; + } + + public TopicParamsBuilder longitude(String longitude) { + this.m.longitude = longitude; + return this; + } + + public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) { + this.m.additionalProps = additionalProps; + return this; + } + + public TopicParamsBuilder partitionId(String partitionId) { + this.m.partitionId = partitionId; + return this; + } + + public BusTopicParams build() { + return m; + } + + } +} + diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java index f3c736da..5493468a 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,23 +53,24 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi /** * constructor for abstract sink - * - * @param servers servers - * @param topic topic - * @param apiKey api secret - * @param apiSecret api secret - * @param partitionId partition id - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow + * @param busTopicParams contains below listed attributes + * servers servers + * topic topic + * apiKey api secret + * apiSecret api secret + * partitionId partition id + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow * * @throws IllegalArgumentException in invalid parameters are passed in */ - public InlineBusTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String partitionId, - boolean useHttps, boolean allowSelfSignedCerts) { + public InlineBusTopicSink(BusTopicParams busTopicParams) { - super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); + super(busTopicParams); - if (partitionId == null || partitionId.isEmpty()) { + if (busTopicParams.isPartitionIdNullOrEmpty()) { this.partitionId = UUID.randomUUID().toString(); + } else { + this.partitionId = busTopicParams.getPartitionId(); } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java index 3ea7185e..3dd40312 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -3,13 +3,14 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -48,65 +49,67 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop protected Map<String, String> additionalProps = null; /** - * - * @param servers DMaaP servers - * @param topic DMaaP Topic to be monitored - * @param apiKey DMaaP API Key (optional) - * @param apiSecret DMaaP API Secret (optional) - * @param consumerGroup DMaaP Reader Consumer Group - * @param consumerInstance DMaaP Reader Instance - * @param fetchTimeout DMaaP fetch timeout - * @param fetchLimit DMaaP fetch limit - * @param environment DME2 Environment - * @param aftEnvironment DME2 AFT Environment - * @param partner DME2 Partner - * @param latitude DME2 Latitude - * @param longitude DME2 Longitude - * @param additionalProps Additional properties to pass to DME2 - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * + * BusTopicParams contains the below mentioned attributes + * servers DMaaP servers + * topic DMaaP Topic to be monitored + * apiKey DMaaP API Key (optional) + * apiSecret DMaaP API Secret (optional) + * environment DME2 Environment + * aftEnvironment DME2 AFT Environment + * partner DME2 Partner + * latitude DME2 Latitude + * longitude DME2 Longitude + * additionalProps Additional properties to pass to DME2 + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow + * @param busTopicParams Contains the above mentioned parameters * @throws IllegalArgumentException An invalid parameter passed in */ - public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, String environment, String aftEnvironment, String partner, - String latitude, String longitude, Map<String, String> additionalProps, boolean useHttps, - boolean allowSelfSignedCerts) { + public InlineDmaapTopicSink(BusTopicParams busTopicParams) { - super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); + super(busTopicParams); - this.userName = userName; - this.password = password; + this.userName = busTopicParams.getUserName(); + this.password = busTopicParams.getPassword(); - this.environment = environment; - this.aftEnvironment = aftEnvironment; - this.partner = partner; + this.environment = busTopicParams.getEnvironment(); + this.aftEnvironment = busTopicParams.getAftEnvironment(); + this.partner = busTopicParams.getPartner(); - this.latitude = latitude; - this.longitude = longitude; + this.latitude = busTopicParams.getLatitude(); + this.longitude = busTopicParams.getLongitude(); - this.additionalProps = additionalProps; - } - - public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) { - - super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); - - this.userName = userName; - this.password = password; + this.additionalProps = busTopicParams.getAdditionalProps(); } @Override public void init() { if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, - this.apiSecret, this.userName, this.password, this.useHttps, this.allowSelfSignedCerts); + this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .apiKey(this.apiKey) + .apiSecret(this.apiSecret) + .userName(this.userName) + .password(this.password) + .useHttps(this.useHttps) + .allowSelfSignedCerts(this.allowSelfSignedCerts) + .build()); } else { - this.publisher = new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, this.userName, - this.password, this.environment, this.aftEnvironment, this.partner, this.latitude, this.longitude, - this.additionalProps, this.useHttps); + this.publisher = new BusPublisher.DmaapDmePublisherWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .userName(this.userName) + .password(this.password) + .environment(this.environment) + .aftEnvironment(this.aftEnvironment) + .partner(this.partner) + .latitude(this.latitude) + .longitude(this.longitude) + .additionalProps(this.additionalProps) + .useHttps(this.useHttps) + .build()); } logger.info("{}: DMAAP SINK created", this); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java index fefe6493..218e44b4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java @@ -3,13 +3,14 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -39,21 +40,21 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class); /** - * Argument-based UEB Topic Writer instantiation - * - * @param servers list of UEB servers available for publishing - * @param topic the topic to publish to - * @param apiKey the api key (optional) - * @param apiSecret the api secret (optional) - * @param partitionId the partition key (optional, autogenerated if not provided) - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * + * Argument-based UEB Topic Writer instantiation. BusTopicParams contains below mentioned + * attributes + * + * servers list of UEB servers available for publishing + * topic the topic to publish to + * apiKey the api key (optional) + * apiSecret the api secret (optional) + * partitionId the partition key (optional, autogenerated if not provided) + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow + * @param busTopicParams contains attributes needed * @throws IllegalArgumentException if invalid arguments are detected */ - public InlineUebTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String partitionId, - boolean useHttps, boolean allowSelfSignedCerts) { - super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts); + public InlineUebTopicSink(BusTopicParams busTopicParams) { + super(busTopicParams); } /** @@ -62,8 +63,14 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi @Override public void init() { - this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, - null, null, this.useHttps, this.allowSelfSignedCerts); + this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .apiKey(this.apiKey) + .apiSecret(this.apiSecret) + .useHttps(this.useHttps) + .allowSelfSignedCerts(this.allowSelfSignedCerts) + .build()); logger.info("{}: UEB SINK created", this); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 74912cae..400cbfe2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +22,6 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.net.MalformedURLException; -import java.util.List; -import java.util.Map; import java.util.UUID; import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource; @@ -85,15 +84,15 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase */ public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) { - super(busTopicParams.getServers(), busTopicParams.getTopic(), busTopicParams.getApiKey(), busTopicParams.getApiSecret(), busTopicParams.isUseHttps(), busTopicParams.isAllowSelfSignedCerts()); + super(busTopicParams); - if (busTopicParams.getConsumerGroup() == null || busTopicParams.getConsumerGroup().isEmpty()) { + if (busTopicParams.isConsumerGroupNullOrEmpty()) { this.consumerGroup = UUID.randomUUID().toString(); } else { this.consumerGroup = busTopicParams.getConsumerGroup(); } - if (busTopicParams.getConsumerInstance() == null || busTopicParams.getConsumerInstance().isEmpty()) { + if (busTopicParams.isConsumerInstanceNullOrEmpty()) { this.consumerInstance = NetworkUtil.getHostname(); } else { this.consumerInstance = busTopicParams.getConsumerInstance(); @@ -312,225 +311,4 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase return fetchLimit; } - /** - * Member variables of this Params class are as follows - * servers DMaaP servers - * topic DMaaP Topic to be monitored - * apiKey DMaaP API Key (optional) - * apiSecret DMaaP API Secret (optional) - * consumerGroup DMaaP Reader Consumer Group - * consumerInstance DMaaP Reader Instance - * fetchTimeout DMaaP fetch timeout - * fetchLimit DMaaP fetch limit - * environment DME2 Environment - * aftEnvironment DME2 AFT Environment - * partner DME2 Partner - * latitude DME2 Latitude - * longitude DME2 Longitude - * additionalProps Additional properties to pass to DME2 - * useHttps does connection use HTTPS? - * allowSelfSignedCerts are self-signed certificates allow - * - */ - public static class BusTopicParams { - - public static TopicParamsBuilder builder() { - return new TopicParamsBuilder(); - } - private List<String> servers; - private String topic; - private String apiKey; - private String apiSecret; - private String consumerGroup; - private String consumerInstance; - private int fetchTimeout; - private int fetchLimit; - private boolean useHttps; - private boolean allowSelfSignedCerts; - - private String userName; - private String password; - private String environment; - private String aftEnvironment; - private String partner; - private String latitude; - private String longitude; - private Map<String, String> additionalProps; - - public String getUserName() { - return userName; - } - - public String getPassword() { - return password; - } - - public String getEnvironment() { - return environment; - } - - public String getAftEnvironment() { - return aftEnvironment; - } - - public String getPartner() { - return partner; - } - - public String getLatitude() { - return latitude; - } - - public String getLongitude() { - return longitude; - } - - public Map<String, String> getAdditionalProps() { - return additionalProps; - } - - public List<String> getServers() { - return servers; - } - - public String getTopic() { - return topic; - } - - public String getApiKey() { - return apiKey; - } - - public String getApiSecret() { - return apiSecret; - } - - public String getConsumerGroup() { - return consumerGroup; - } - - public String getConsumerInstance() { - return consumerInstance; - } - - public int getFetchTimeout() { - return fetchTimeout; - } - - public int getFetchLimit() { - return fetchLimit; - } - - public boolean isUseHttps() { - return useHttps; - } - - public boolean isAllowSelfSignedCerts() { - return allowSelfSignedCerts; - } - - - public static class TopicParamsBuilder { - BusTopicParams m = new BusTopicParams(); - - private TopicParamsBuilder() { - } - - public TopicParamsBuilder servers(List<String> servers) { - this.m.servers = servers; - return this; - } - - public TopicParamsBuilder topic(String topic) { - this.m.topic = topic; - return this; - } - - public TopicParamsBuilder apiKey(String apiKey) { - this.m.apiKey = apiKey; - return this; - } - - public TopicParamsBuilder apiSecret(String apiSecret) { - this.m.apiSecret = apiSecret; - return this; - } - - public TopicParamsBuilder consumerGroup(String consumerGroup) { - this.m.consumerGroup = consumerGroup; - return this; - } - - public TopicParamsBuilder consumerInstance(String consumerInstance) { - this.m.consumerInstance = consumerInstance; - return this; - } - - public TopicParamsBuilder fetchTimeout(int fetchTimeout) { - this.m.fetchTimeout = fetchTimeout; - return this; - } - - public TopicParamsBuilder fetchLimit(int fetchLimit) { - this.m.fetchLimit = fetchLimit; - return this; - } - - public TopicParamsBuilder useHttps(boolean useHttps) { - this.m.useHttps = useHttps; - return this; - } - - public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) { - this.m.allowSelfSignedCerts = allowSelfSignedCerts; - return this; - } - - public TopicParamsBuilder userName(String userName) { - this.m.userName = userName; - return this; - } - - public TopicParamsBuilder password(String password) { - this.m.password = password; - return this; - } - - public TopicParamsBuilder environment(String environment) { - this.m.environment = environment; - return this; - } - - public TopicParamsBuilder aftEnvironment(String aftEnvironment) { - this.m.aftEnvironment = aftEnvironment; - return this; - } - - public TopicParamsBuilder partner(String partner) { - this.m.partner = partner; - return this; - } - - public TopicParamsBuilder latitude(String latitude) { - this.m.latitude = latitude; - return this; - } - - public TopicParamsBuilder longitude(String longitude) { - this.m.longitude = longitude; - return this; - } - - public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) { - this.m.additionalProps = additionalProps; - return this; - } - - public BusTopicParams build() { - return m; - } - - } - - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index c6bd5568..65f75aa5 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -84,18 +85,52 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource @Override public void init() throws MalformedURLException { if (anyNullOrEmpty(this.userName, this.password)) { - this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, - this.apiSecret, this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, - this.useHttps, this.allowSelfSignedCerts); + this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .apiKey(this.apiKey) + .apiSecret(this.apiSecret) + .consumerGroup(this.consumerGroup) + .consumerInstance(this.consumerInstance) + .fetchTimeout(this.fetchTimeout) + .fetchLimit(this.fetchLimit) + .useHttps(this.useHttps) + .allowSelfSignedCerts(this.allowSelfSignedCerts) + .build()); } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, - this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts); + this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .apiKey(this.apiKey) + .apiSecret(this.apiSecret) + .userName(this.userName) + .password(this.password) + .consumerGroup(this.consumerGroup) + .consumerInstance(this.consumerInstance) + .fetchTimeout(this.fetchTimeout) + .fetchLimit(this.fetchLimit) + .useHttps(this.useHttps) + .allowSelfSignedCerts(this.allowSelfSignedCerts) + .build()); } else { - this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey, - this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit, this.environment, this.aftEnvironment, this.partner, - this.latitude, this.longitude, this.additionalProps, this.useHttps); + this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .apiKey(this.apiKey) + .apiSecret(this.apiSecret) + .userName(this.userName) + .password(this.password) + .consumerGroup(this.consumerGroup) + .consumerInstance(this.consumerInstance) + .fetchTimeout(this.fetchTimeout) + .fetchLimit(this.fetchLimit) + .environment(this.environment) + .aftEnvironment(this.aftEnvironment) + .partner(this.partner) + .latitude(this.latitude) + .longitude(this.longitude) + .additionalProps(this.additionalProps) + .useHttps(this.useHttps).build()); } logger.info("{}: INITTED", this); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java index 03273a2b..fb20ccc4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,8 +31,7 @@ import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource { /** - * - * @param busTopicParams Parameters object containing all the required inputs * + * @param busTopicParams Parameters object containing all the required inputs * @throws IllegalArgumentException An invalid parameter passed in */ @@ -50,9 +50,17 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i */ @Override public void init() { - this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, - this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.useHttps, - this.allowSelfSignedCerts); + this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .apiKey(this.apiKey) + .apiSecret(this.apiSecret) + .consumerGroup(this.consumerGroup) + .consumerInstance(this.consumerInstance) + .fetchTimeout(this.fetchTimeout) + .fetchLimit(this.fetchLimit) + .useHttps(this.useHttps) + .allowSelfSignedCerts(this.allowSelfSignedCerts).build()); } /** |