From fb3c2a56b46a95c188ec4eec9c98c7bd0b881097 Mon Sep 17 00:00:00 2001 From: krishnajinka Date: Sun, 5 Aug 2018 16:52:10 +0900 Subject: Apply builder pattern for topic cnsmer prdcr Modify endpoints event bus related classes to use builder pattern in particular apply bus topic params object instead of using parameters as it is. Rework based on commnts Issue-ID: POLICY-1017 Change-Id: I572a72fa525cf4f664eb70d0415be73116499bd2 Signed-off-by: krisjinka --- .../common/endpoints/event/comm/TopicEndpoint.java | 3 +- .../event/comm/bus/DmaapTopicSinkFactory.java | 119 ++++---- .../event/comm/bus/DmaapTopicSourceFactory.java | 7 +- .../event/comm/bus/UebTopicSinkFactory.java | 13 +- .../event/comm/bus/UebTopicSourceFactory.java | 6 +- .../event/comm/bus/internal/BusConsumer.java | 201 +++++++------ .../event/comm/bus/internal/BusPublisher.java | 88 +++--- .../event/comm/bus/internal/BusTopicBase.java | 29 +- .../event/comm/bus/internal/BusTopicParams.java | 316 +++++++++++++++++++++ .../comm/bus/internal/InlineBusTopicSink.java | 26 +- .../comm/bus/internal/InlineDmaapTopicSink.java | 97 ++++--- .../comm/bus/internal/InlineUebTopicSink.java | 41 +-- .../bus/internal/SingleThreadedBusTopicSource.java | 230 +-------------- .../internal/SingleThreadedDmaapTopicSource.java | 55 +++- .../bus/internal/SingleThreadedUebTopicSource.java | 18 +- 15 files changed, 711 insertions(+), 538 deletions(-) create mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java (limited to 'policy-endpoints') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index e7a21ca1..4e2f4ecf 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -75,7 +75,7 @@ public interface TopicEndpoint extends Startable, Lockable { /** * get the Topic Sources for the given topic name * - * @param topicName the topic name + * @param topicNames the topic name * * @return the Topic Source List * @throws IllegalStateException if the entity is in an invalid state @@ -150,7 +150,6 @@ public interface TopicEndpoint extends Startable, Lockable { * infrastructure type * * @param topicName the topic name - * @param commType communication infrastructure type * * @return the Topic Sink List * @throws IllegalStateException if the entity is in an invalid state, for example multiple diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java index 26e8d413..08a1db8f 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java @@ -3,13 +3,14 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -27,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; @@ -47,53 +49,50 @@ public interface DmaapTopicSinkFactory { /** * Instantiates a new DMAAP Topic Sink - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName AAF user name - * @param password AAF password - * @param partitionKey Consumer Group - * @param environment DME2 environment - * @param aftEnvironment DME2 AFT environment - * @param partner DME2 Partner - * @param latitude DME2 latitude - * @param longitude DME2 longitude + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName AAF user name + * @param password AAF password + * @param partitionKey Consumer Group + * @param environment DME2 environment + * @param aftEnvironment DME2 AFT environment + * @param partner DME2 Partner + * @param latitude DME2 latitude + * @param longitude DME2 longitude * @param additionalProps additional properties to pass to DME2 - * @param managed is this sink endpoint managed? - * + * @param managed is this sink endpoint managed? * @return an DMAAP Topic Sink * @throws IllegalArgumentException if invalid parameters are present */ public DmaapTopicSink build(List servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, String environment, String aftEnvironment, String partner, - String latitude, String longitude, Map additionalProps, boolean managed, boolean useHttps, - boolean allowSelfSignedCerts); + String password, String partitionKey, String environment, String aftEnvironment, String partner, + String latitude, String longitude, Map additionalProps, boolean managed, boolean useHttps, + boolean allowSelfSignedCerts); /** * Instantiates a new DMAAP Topic Sink - * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param userName AAF user name - * @param password AAF password + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName AAF user name + * @param password AAF password * @param partitionKey Consumer Group - * @param managed is this sink endpoint managed? - * + * @param managed is this sink endpoint managed? * @return an DMAAP Topic Sink * @throws IllegalArgumentException if invalid parameters are present */ public DmaapTopicSink build(List servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts); + String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts); /** * Creates an DMAAP Topic Sink based on properties files - * + * * @param properties Properties containing initialization values - * * @return an DMAAP Topic Sink * @throws IllegalArgumentException if invalid parameters are present */ @@ -101,10 +100,9 @@ public interface DmaapTopicSinkFactory { /** * Instantiates a new DMAAP Topic Sink - * + * * @param servers list of servers - * @param topic topic name - * + * @param topic topic name * @return an DMAAP Topic Sink * @throws IllegalArgumentException if invalid parameters are present */ @@ -112,7 +110,7 @@ public interface DmaapTopicSinkFactory { /** * Destroys an DMAAP Topic Sink based on a topic - * + * * @param topic topic name * @throws IllegalArgumentException if invalid parameters are present */ @@ -120,18 +118,17 @@ public interface DmaapTopicSinkFactory { /** * gets an DMAAP Topic Sink based on topic name - * + * * @param topic the topic name - * * @return an DMAAP Topic Sink with topic name * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state + * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state */ public DmaapTopicSink get(String topic); /** * Provides a snapshot of the DMAAP Topic Sinks - * + * * @return a list of the DMAAP Topic Sinks */ public List inventory(); @@ -163,9 +160,9 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { @Override public DmaapTopicSink build(List servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, String environment, String aftEnvironment, String partner, - String latitude, String longitude, Map additionalProps, boolean managed, boolean useHttps, - boolean allowSelfSignedCerts) { + String password, String partitionKey, String environment, String aftEnvironment, String partner, + String latitude, String longitude, Map additionalProps, boolean managed, boolean useHttps, + boolean allowSelfSignedCerts) { if (topic == null || topic.isEmpty()) { throw new IllegalArgumentException(MISSING_TOPIC); @@ -176,9 +173,23 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { return dmaapTopicWriters.get(topic); } - DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName, - password, partitionKey, environment, aftEnvironment, partner, latitude, longitude, additionalProps, - useHttps, allowSelfSignedCerts); + DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .userName(userName) + .password(password) + .partitionId(partitionKey) + .environment(environment) + .aftEnvironment(aftEnvironment) + .partner(partner) + .latitude(latitude) + .longitude(longitude) + .additionalProps(additionalProps) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); if (managed) { dmaapTopicWriters.put(topic, dmaapTopicSink); @@ -189,7 +200,8 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { @Override public DmaapTopicSink build(List servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) { + String password, String partitionKey, boolean managed, boolean useHttps, + boolean allowSelfSignedCerts) { if (topic == null || topic.isEmpty()) { throw new IllegalArgumentException(MISSING_TOPIC); @@ -200,8 +212,17 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { return dmaapTopicWriters.get(topic); } - DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName, - password, partitionKey, useHttps, allowSelfSignedCerts); + DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .userName(userName) + .password(password) + .partitionId(partitionKey) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); if (managed) { dmaapTopicWriters.put(topic, dmaapTopicSink); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java index 4285b3a9..11dfd292 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; @@ -204,7 +205,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { return dmaapTopicSources.get(topic); } - DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder() + DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(BusTopicParams.builder() .servers(servers) .topic(topic) .apiKey(apiKey) @@ -255,7 +256,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { } DmaapTopicSource dmaapTopicSource = - new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder() + new SingleThreadedDmaapTopicSource(BusTopicParams.builder() .servers(servers) .topic(topic) .apiKey(apiKey) diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java index a522e2c5..9d1bd8ad 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Properties; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; @@ -141,8 +143,15 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { return uebTopicSinks.get(topic); } - UebTopicSink uebTopicWriter = new InlineUebTopicSink(servers, topic, apiKey, apiSecret, partitionKey, - useHttps, allowSelfSignedCerts); + UebTopicSink uebTopicWriter = new InlineUebTopicSink(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .partitionId(partitionKey) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); if (managed) { uebTopicSinks.put(topic, uebTopicWriter); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java index 4c3cbbf8..8d3f28e9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Properties; -import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; @@ -161,7 +162,7 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { return uebTopicSources.get(topic); } - UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder() + UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(BusTopicParams.builder() .servers(servers) .topic(topic) .apiKey(apiKey) @@ -366,5 +367,4 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { builder.append("IndexedUebTopicSourceFactory []"); return builder.toString(); } - } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 636dc6e3..6d34d32b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,7 +33,6 @@ import java.io.IOException; import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Properties; @@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory; /** * Wrapper around libraries to consume from message bus - * */ public interface BusConsumer { @@ -67,7 +66,7 @@ public interface BusConsumer { /** * Sets the server-side filter. - * + * * @param filter new filter value, or {@code null} * @throws IllegalArgumentException if the consumer cannot be built with the new filter */ @@ -116,53 +115,47 @@ public interface BusConsumer { /** * Cambria Consumer Wrapper + * BusTopicParam object contains the following parameters + * servers messaging bus hosts + * topic topic + * apiKey API Key + * apiSecret API Secret + * consumerGroup Consumer Group + * consumerInstance Consumer Instance + * fetchTimeout Fetch Timeout + * fetchLimit Fetch Limit * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit + * @param busTopicParams * @throws GeneralSecurityException * @throws MalformedURLException */ - public CambriaConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps, - boolean useSelfSignedCerts) { - this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout, - fetchLimit, useHttps, useSelfSignedCerts); - } + public CambriaConsumerWrapper(BusTopicParams busTopicParams) { - public CambriaConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, - String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) { - - this.fetchTimeout = fetchTimeout; + this.fetchTimeout = busTopicParams.getFetchTimeout(); this.builder = new CambriaClientBuilders.ConsumerBuilder(); - builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) - .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); + builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance()) + .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()) + .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit()); // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable) builder.withSocketTimeout(fetchTimeout + 30000); - if (useHttps) { + if (busTopicParams.isUseHttps()) { builder.usingHttps(); - if (useSelfSignedCerts) { + if (busTopicParams.isAllowSelfSignedCerts()) { builder.allowSelfSignedCertificates(); } } - if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); + if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { + builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); } - if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { - builder.authenticatedByHttp(username, password); + if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { + builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); } try { @@ -282,34 +275,36 @@ public interface BusConsumer { /** * MR Consumer Wrapper + *

+ * servers messaging bus hosts + * topic topic + * apiKey API Key + * apiSecret API Secret + * username AAF Login + * password AAF Password + * consumerGroup Consumer Group + * consumerInstance Consumer Instance + * fetchTimeout Fetch Timeout + * fetchLimit Fetch Limit * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param username AAF Login - * @param password AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit + * @param busTopicParams contains above listed attributes * @throws MalformedURLException */ - public DmaapConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, - String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit) throws MalformedURLException { + public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - this.fetchTimeout = fetchTimeout; + this.fetchTimeout = busTopicParams.getFetchTimeout(); - if (topic == null || topic.isEmpty()) { + if (busTopicParams.isTopicNullOrEmpty()) { throw new IllegalArgumentException("No topic for DMaaP"); } - this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, - fetchLimit, null, apiKey, apiSecret); + this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(), + busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(), + busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null, + busTopicParams.getApiKey(), busTopicParams.getApiSecret()); - this.consumer.setUsername(username); - this.consumer.setPassword(password); + this.consumer.setUsername(busTopicParams.getUserName()); + this.consumer.setPassword(busTopicParams.getPassword()); } @Override @@ -374,29 +369,29 @@ public interface BusConsumer { private final Properties props; /** + * BusTopicParams contain the following parameters * MR Consumer Wrapper + *

+ * servers messaging bus hosts + * topic topic + * apiKey API Key + * apiSecret API Secret + * aafLogin AAF Login + * aafPassword AAF Password + * consumerGroup Consumer Group + * consumerInstance Consumer Instance + * fetchTimeout Fetch Timeout + * fetchLimit Fetch Limit * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param aafLogin AAF Login - * @param aafPassword AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit + * @param busTopicParams contains above listed params * @throws MalformedURLException */ - public DmaapAafConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, - String aafLogin, String aafPassword, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, boolean useHttps) throws MalformedURLException { + public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + super(busTopicParams); // super constructor sets servers = {""} if empty to avoid errors when using DME2 - if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) || (servers.isEmpty())) { + if (busTopicParams.isServersNullOrEmpty()) { throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); } @@ -404,13 +399,13 @@ public interface BusConsumer { props = new Properties(); - if (useHttps) { + if (busTopicParams.isUseHttps()) { props.setProperty(PROTOCOL_PROP, "https"); - this.consumer.setHost(servers.get(0) + ":3905"); + this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905"); } else { props.setProperty(PROTOCOL_PROP, "http"); - this.consumer.setHost(servers.get(0) + ":3904"); + this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904"); } this.consumer.setProps(props); @@ -434,70 +429,72 @@ public interface BusConsumer { private final Properties props; - public DmaapDmeConsumerWrapper(List servers, String topic, String apiKey, String apiSecret, - String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude, - String longitude, Map additionalProps, boolean useHttps) throws MalformedURLException { - + public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + super(busTopicParams); - final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + final String dme2RouteOffer = busTopicParams.getAdditionalProps() + .get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - if (environment == null || environment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + if (busTopicParams.isEnvironmentNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); } - if (aftEnvironment == null || aftEnvironment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + if (busTopicParams.isAftEnvironmentNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); } - if (latitude == null || latitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + if (busTopicParams.isLatitudeNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); } - if (longitude == null || longitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + if (busTopicParams.isLongitudeNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); } - if ((dme2Partner == null || dme2Partner.isEmpty()) + if ((busTopicParams.isPartnerNullOrEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + busTopicParams.getTopic() + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + busTopicParams.getTopic() + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); } - final String serviceName = servers.get(0); + final String serviceName = busTopicParams.getServers().get(0); this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - this.consumer.setUsername(dme2Login); - this.consumer.setPassword(dme2Password); + this.consumer.setUsername(busTopicParams.getUserName()); + this.consumer.setPassword(busTopicParams.getPassword()); props = new Properties(); props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); - props.setProperty("username", dme2Login); - props.setProperty("password", dme2Password); + props.setProperty("username", busTopicParams.getUserName()); + props.setProperty("password", busTopicParams.getPassword()); /* These are required, no defaults */ - props.setProperty("topic", topic); + props.setProperty("topic", busTopicParams.getTopic()); - props.setProperty("Environment", environment); - props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + props.setProperty("Environment", busTopicParams.getEnvironment()); + props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); - if (dme2Partner != null) { - props.setProperty("Partner", dme2Partner); + if (busTopicParams.getPartner() != null) { + props.setProperty("Partner", busTopicParams.getPartner()); } if (dme2RouteOffer != null) { props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); } - props.setProperty("Latitude", latitude); - props.setProperty("Longitude", longitude); + props.setProperty("Latitude", busTopicParams.getLatitude()); + props.setProperty("Longitude", busTopicParams.getLongitude()); /* These are optional, will default to these values if not set in additionalProps */ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); @@ -511,7 +508,7 @@ public interface BusConsumer { props.setProperty("TransportType", "DME2"); props.setProperty("MethodType", "GET"); - if (useHttps) { + if (busTopicParams.isUseHttps()) { props.setProperty(PROTOCOL_PROP, "https"); } else { @@ -520,8 +517,8 @@ public interface BusConsumer { props.setProperty("contenttype", "application/json"); - if (additionalProps != null) { - for (Map.Entry entry : additionalProps.entrySet()) { + if (busTopicParams.isAdditionalPropsValid()) { + for (Map.Entry entry : busTopicParams.getAdditionalProps().entrySet()) { props.put(entry.getKey(), entry.getValue()); } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 9db9131c..348100ab 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,7 +48,7 @@ public interface BusPublisher { /** * sends a message * - * @param partition id + * @param partitionId id * @param message the message * @return true if success, false otherwise * @throws IllegalArgumentException if no message provided @@ -72,23 +73,17 @@ public interface BusPublisher { @JsonIgnore protected volatile CambriaBatchingPublisher publisher; - public CambriaPublisherWrapper(List servers, String topic, String apiKey, String apiSecret, - boolean useHttps) { - this(servers, topic, apiKey, apiSecret, null, null, useHttps, false); - } - - public CambriaPublisherWrapper(List servers, String topic, String apiKey, String apiSecret, - String username, String password, boolean useHttps, boolean selfSignedCerts) { + public CambriaPublisherWrapper(BusTopicParams busTopicParams) { PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); - builder.usingHosts(servers).onTopic(topic); + builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic()); // Set read timeout to 30 seconds (TBD: this should be configurable) builder.withSocketTimeout(30000); - if (useHttps) { - if (selfSignedCerts) { + if (busTopicParams.isUseHttps()) { + if (busTopicParams.isAllowSelfSignedCerts()) { builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION); } else { builder.withConnectionType(ConnectionType.HTTPS); @@ -96,12 +91,12 @@ public interface BusPublisher { } - if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); + if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) { + builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret()); } - if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { - builder.authenticatedByHttp(username, password); + if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) { + builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword()); } try { @@ -297,55 +292,60 @@ public interface BusPublisher { } public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { - public DmaapDmePublisherWrapper(List servers, String topic, String username, String password, - String environment, String aftEnvironment, String dme2Partner, String latitude, String longitude, - Map additionalProps, boolean useHttps) { - - super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps); - - - - String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) { + + super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(), + busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps()); + String dme2RouteOffer = null; + if (busTopicParams.isAdditionalPropsValid()) { + dme2RouteOffer = busTopicParams.getAdditionalProps().get( + DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + } - if (environment == null || environment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + if (busTopicParams.isEnvironmentNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); } - if (aftEnvironment == null || aftEnvironment.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + if (busTopicParams.isAftEnvironmentNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); } - if (latitude == null || latitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + if (busTopicParams.isLatitudeNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); } - if (longitude == null || longitude.isEmpty()) { - throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + if (busTopicParams.isLongitudeNullOrEmpty()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); } - if ((dme2Partner == null || dme2Partner.isEmpty()) - && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { + if ((busTopicParams.isPartnerNullOrEmpty()) + && (dme2RouteOffer == null || dme2RouteOffer.trim().isEmpty())) { throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + busTopicParams.getTopic() + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic() + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); } - String serviceName = servers.get(0); + String serviceName = busTopicParams.getServers().get(0); /* These are required, no defaults */ - props.setProperty("Environment", environment); - props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + props.setProperty("Environment", busTopicParams.getEnvironment()); + props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); - if (dme2Partner != null) { - props.setProperty("Partner", dme2Partner); + if (busTopicParams.getPartner() != null) { + props.setProperty("Partner", busTopicParams.getPartner()); } if (dme2RouteOffer != null) { props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); } - props.setProperty("Latitude", latitude); - props.setProperty("Longitude", longitude); + props.setProperty("Latitude", busTopicParams.getLatitude()); + props.setProperty("Longitude", busTopicParams.getLongitude()); // ServiceName also a default, found in additionalProps @@ -361,7 +361,7 @@ public interface BusPublisher { props.setProperty("TransportType", "DME2"); props.setProperty("MethodType", "POST"); - for (Map.Entry entry : additionalProps.entrySet()) { + for (Map.Entry entry : busTopicParams.getAdditionalProps().entrySet()) { String key = entry.getKey(); String value = entry.getValue(); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java index 7f4c0ddd..08993126 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java @@ -52,25 +52,22 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { /** * Instantiates a new Bus Topic Base * - * @param servers list of servers - * @param topic topic name - * @param apiKey API Key - * @param apiSecret API Secret - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * + * servers list of servers + * topic topic name + * apiKey API Key + * apiSecret API Secret + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow + * @param busTopicParams * @return a Bus Topic Base * @throws IllegalArgumentException if invalid parameters are present */ - public BusTopicBase(List servers, String topic, String apiKey, String apiSecret, boolean useHttps, - boolean allowSelfSignedCerts) { - - super(servers, topic); - - this.apiKey = apiKey; - this.apiSecret = apiSecret; - this.useHttps = useHttps; - this.allowSelfSignedCerts = allowSelfSignedCerts; + public BusTopicBase(BusTopicParams busTopicParams) { + super(busTopicParams.getServers(), busTopicParams.getTopic()); + this.apiKey = busTopicParams.getApiKey(); + this.apiSecret = busTopicParams.getApiSecret(); + this.useHttps = busTopicParams.isUseHttps(); + this.allowSelfSignedCerts = busTopicParams.isAllowSelfSignedCerts(); } @Override diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java new file mode 100644 index 00000000..ffefcbf2 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java @@ -0,0 +1,316 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import java.util.List; +import java.util.Map; + +/** + * Member variables of this Params class are as follows + * servers DMaaP servers + * topic DMaaP Topic to be monitored + * apiKey DMaaP API Key (optional) + * apiSecret DMaaP API Secret (optional) + * consumerGroup DMaaP Reader Consumer Group + * consumerInstance DMaaP Reader Instance + * fetchTimeout DMaaP fetch timeout + * fetchLimit DMaaP fetch limit + * environment DME2 Environment + * aftEnvironment DME2 AFT Environment + * partner DME2 Partner + * latitude DME2 Latitude + * longitude DME2 Longitude + * additionalProps Additional properties to pass to DME2 + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow + */ +public class BusTopicParams { + + public static TopicParamsBuilder builder() { + return new TopicParamsBuilder(); + } + + private List servers; + private String topic; + private String apiKey; + private String apiSecret; + private String consumerGroup; + private String consumerInstance; + private int fetchTimeout; + private int fetchLimit; + private boolean useHttps; + private boolean allowSelfSignedCerts; + + private String userName; + private String password; + private String environment; + private String aftEnvironment; + private String partner; + private String latitude; + private String longitude; + private Map additionalProps; + private String partitionId; + + String getPartitionId() { + return partitionId; + } + + String getUserName() { + return userName; + } + + String getPassword() { + return password; + } + + String getEnvironment() { + return environment; + } + + String getAftEnvironment() { + return aftEnvironment; + } + + String getPartner() { + return partner; + } + + String getLatitude() { + return latitude; + } + + String getLongitude() { + return longitude; + } + + Map getAdditionalProps() { + return additionalProps; + } + + List getServers() { + return servers; + } + + String getTopic() { + return topic; + } + + String getApiKey() { + return apiKey; + } + + String getApiSecret() { + return apiSecret; + } + + String getConsumerGroup() { + return consumerGroup; + } + + String getConsumerInstance() { + return consumerInstance; + } + + int getFetchTimeout() { + return fetchTimeout; + } + + int getFetchLimit() { + return fetchLimit; + } + + boolean isUseHttps() { + return useHttps; + } + + boolean isAllowSelfSignedCerts() { + return allowSelfSignedCerts; + } + + boolean isEnvironmentNullOrEmpty() { + return (environment == null || environment.trim().isEmpty()); + } + + boolean isAftEnvironmentNullOrEmpty() { + return (aftEnvironment == null || aftEnvironment.trim().isEmpty()); + } + + boolean isLatitudeNullOrEmpty() { + return (latitude == null || latitude.trim().isEmpty()); + } + + boolean isLongitudeNullOrEmpty() { + return (longitude == null || longitude.trim().isEmpty()); + } + + boolean isConsumerInstanceNullOrEmpty() { + return (consumerInstance == null || consumerInstance.trim().isEmpty()); + } + + boolean isConsumerGroupNullOrEmpty() { + return (consumerGroup == null || consumerGroup.trim().isEmpty()); + } + + boolean isApiKeyValid() { + return !(apiKey == null || apiKey.trim().isEmpty()); + } + + boolean isApiSecretValid() { + return !(apiSecret == null || apiSecret.trim().isEmpty()); + } + + boolean isUserNameValid() { + return !(userName == null || userName.trim().isEmpty()); + } + + boolean isPasswordValid() { + return !(password == null || password.trim().isEmpty()); + } + + boolean isPartnerNullOrEmpty() { + return (partner == null || partner.trim().isEmpty()); + } + + boolean isServersNullOrEmpty() { + return (servers == null || servers.isEmpty() + || (servers.size() == 1 && ("".equals(servers.get(0))))); + } + + boolean isAdditionalPropsValid() { + return additionalProps != null; + } + + boolean isTopicNullOrEmpty() { + return (topic == null || topic.trim().isEmpty()); + } + + boolean isPartitionIdNullOrEmpty() { + return (partitionId == null || partitionId.trim().isEmpty()); + } + + public static class TopicParamsBuilder { + BusTopicParams m = new BusTopicParams(); + + private TopicParamsBuilder() { + } + + public TopicParamsBuilder servers(List servers) { + this.m.servers = servers; + return this; + } + + public TopicParamsBuilder topic(String topic) { + this.m.topic = topic; + return this; + } + + public TopicParamsBuilder apiKey(String apiKey) { + this.m.apiKey = apiKey; + return this; + } + + public TopicParamsBuilder apiSecret(String apiSecret) { + this.m.apiSecret = apiSecret; + return this; + } + + public TopicParamsBuilder consumerGroup(String consumerGroup) { + this.m.consumerGroup = consumerGroup; + return this; + } + + public TopicParamsBuilder consumerInstance(String consumerInstance) { + this.m.consumerInstance = consumerInstance; + return this; + } + + public TopicParamsBuilder fetchTimeout(int fetchTimeout) { + this.m.fetchTimeout = fetchTimeout; + return this; + } + + public TopicParamsBuilder fetchLimit(int fetchLimit) { + this.m.fetchLimit = fetchLimit; + return this; + } + + public TopicParamsBuilder useHttps(boolean useHttps) { + this.m.useHttps = useHttps; + return this; + } + + public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) { + this.m.allowSelfSignedCerts = allowSelfSignedCerts; + return this; + } + + public TopicParamsBuilder userName(String userName) { + this.m.userName = userName; + return this; + } + + public TopicParamsBuilder password(String password) { + this.m.password = password; + return this; + } + + public TopicParamsBuilder environment(String environment) { + this.m.environment = environment; + return this; + } + + public TopicParamsBuilder aftEnvironment(String aftEnvironment) { + this.m.aftEnvironment = aftEnvironment; + return this; + } + + public TopicParamsBuilder partner(String partner) { + this.m.partner = partner; + return this; + } + + public TopicParamsBuilder latitude(String latitude) { + this.m.latitude = latitude; + return this; + } + + public TopicParamsBuilder longitude(String longitude) { + this.m.longitude = longitude; + return this; + } + + public TopicParamsBuilder additionalProps(Map additionalProps) { + this.m.additionalProps = additionalProps; + return this; + } + + public TopicParamsBuilder partitionId(String partitionId) { + this.m.partitionId = partitionId; + return this; + } + + public BusTopicParams build() { + return m; + } + + } +} + diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java index f3c736da..5493468a 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,23 +53,24 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi /** * constructor for abstract sink - * - * @param servers servers - * @param topic topic - * @param apiKey api secret - * @param apiSecret api secret - * @param partitionId partition id - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow + * @param busTopicParams contains below listed attributes + * servers servers + * topic topic + * apiKey api secret + * apiSecret api secret + * partitionId partition id + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow * * @throws IllegalArgumentException in invalid parameters are passed in */ - public InlineBusTopicSink(List servers, String topic, String apiKey, String apiSecret, String partitionId, - boolean useHttps, boolean allowSelfSignedCerts) { + public InlineBusTopicSink(BusTopicParams busTopicParams) { - super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); + super(busTopicParams); - if (partitionId == null || partitionId.isEmpty()) { + if (busTopicParams.isPartitionIdNullOrEmpty()) { this.partitionId = UUID.randomUUID().toString(); + } else { + this.partitionId = busTopicParams.getPartitionId(); } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java index 3ea7185e..3dd40312 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -3,13 +3,14 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -48,65 +49,67 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop protected Map additionalProps = null; /** - * - * @param servers DMaaP servers - * @param topic DMaaP Topic to be monitored - * @param apiKey DMaaP API Key (optional) - * @param apiSecret DMaaP API Secret (optional) - * @param consumerGroup DMaaP Reader Consumer Group - * @param consumerInstance DMaaP Reader Instance - * @param fetchTimeout DMaaP fetch timeout - * @param fetchLimit DMaaP fetch limit - * @param environment DME2 Environment - * @param aftEnvironment DME2 AFT Environment - * @param partner DME2 Partner - * @param latitude DME2 Latitude - * @param longitude DME2 Longitude - * @param additionalProps Additional properties to pass to DME2 - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * + * BusTopicParams contains the below mentioned attributes + * servers DMaaP servers + * topic DMaaP Topic to be monitored + * apiKey DMaaP API Key (optional) + * apiSecret DMaaP API Secret (optional) + * environment DME2 Environment + * aftEnvironment DME2 AFT Environment + * partner DME2 Partner + * latitude DME2 Latitude + * longitude DME2 Longitude + * additionalProps Additional properties to pass to DME2 + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow + * @param busTopicParams Contains the above mentioned parameters * @throws IllegalArgumentException An invalid parameter passed in */ - public InlineDmaapTopicSink(List servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, String environment, String aftEnvironment, String partner, - String latitude, String longitude, Map additionalProps, boolean useHttps, - boolean allowSelfSignedCerts) { + public InlineDmaapTopicSink(BusTopicParams busTopicParams) { - super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); + super(busTopicParams); - this.userName = userName; - this.password = password; + this.userName = busTopicParams.getUserName(); + this.password = busTopicParams.getPassword(); - this.environment = environment; - this.aftEnvironment = aftEnvironment; - this.partner = partner; + this.environment = busTopicParams.getEnvironment(); + this.aftEnvironment = busTopicParams.getAftEnvironment(); + this.partner = busTopicParams.getPartner(); - this.latitude = latitude; - this.longitude = longitude; + this.latitude = busTopicParams.getLatitude(); + this.longitude = busTopicParams.getLongitude(); - this.additionalProps = additionalProps; - } - - public InlineDmaapTopicSink(List servers, String topic, String apiKey, String apiSecret, String userName, - String password, String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) { - - super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); - - this.userName = userName; - this.password = password; + this.additionalProps = busTopicParams.getAdditionalProps(); } @Override public void init() { if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, - this.apiSecret, this.userName, this.password, this.useHttps, this.allowSelfSignedCerts); + this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .apiKey(this.apiKey) + .apiSecret(this.apiSecret) + .userName(this.userName) + .password(this.password) + .useHttps(this.useHttps) + .allowSelfSignedCerts(this.allowSelfSignedCerts) + .build()); } else { - this.publisher = new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, this.userName, - this.password, this.environment, this.aftEnvironment, this.partner, this.latitude, this.longitude, - this.additionalProps, this.useHttps); + this.publisher = new BusPublisher.DmaapDmePublisherWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .userName(this.userName) + .password(this.password) + .environment(this.environment) + .aftEnvironment(this.aftEnvironment) + .partner(this.partner) + .latitude(this.latitude) + .longitude(this.longitude) + .additionalProps(this.additionalProps) + .useHttps(this.useHttps) + .build()); } logger.info("{}: DMAAP SINK created", this); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java index fefe6493..218e44b4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java @@ -3,13 +3,14 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -39,21 +40,21 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class); /** - * Argument-based UEB Topic Writer instantiation - * - * @param servers list of UEB servers available for publishing - * @param topic the topic to publish to - * @param apiKey the api key (optional) - * @param apiSecret the api secret (optional) - * @param partitionId the partition key (optional, autogenerated if not provided) - * @param useHttps does connection use HTTPS? - * @param allowSelfSignedCerts are self-signed certificates allow - * + * Argument-based UEB Topic Writer instantiation. BusTopicParams contains below mentioned + * attributes + * + * servers list of UEB servers available for publishing + * topic the topic to publish to + * apiKey the api key (optional) + * apiSecret the api secret (optional) + * partitionId the partition key (optional, autogenerated if not provided) + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow + * @param busTopicParams contains attributes needed * @throws IllegalArgumentException if invalid arguments are detected */ - public InlineUebTopicSink(List servers, String topic, String apiKey, String apiSecret, String partitionId, - boolean useHttps, boolean allowSelfSignedCerts) { - super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts); + public InlineUebTopicSink(BusTopicParams busTopicParams) { + super(busTopicParams); } /** @@ -62,8 +63,14 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi @Override public void init() { - this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, - null, null, this.useHttps, this.allowSelfSignedCerts); + this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .apiKey(this.apiKey) + .apiSecret(this.apiSecret) + .useHttps(this.useHttps) + .allowSelfSignedCerts(this.allowSelfSignedCerts) + .build()); logger.info("{}: UEB SINK created", this); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 74912cae..400cbfe2 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +22,6 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.net.MalformedURLException; -import java.util.List; -import java.util.Map; import java.util.UUID; import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource; @@ -85,15 +84,15 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase */ public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) { - super(busTopicParams.getServers(), busTopicParams.getTopic(), busTopicParams.getApiKey(), busTopicParams.getApiSecret(), busTopicParams.isUseHttps(), busTopicParams.isAllowSelfSignedCerts()); + super(busTopicParams); - if (busTopicParams.getConsumerGroup() == null || busTopicParams.getConsumerGroup().isEmpty()) { + if (busTopicParams.isConsumerGroupNullOrEmpty()) { this.consumerGroup = UUID.randomUUID().toString(); } else { this.consumerGroup = busTopicParams.getConsumerGroup(); } - if (busTopicParams.getConsumerInstance() == null || busTopicParams.getConsumerInstance().isEmpty()) { + if (busTopicParams.isConsumerInstanceNullOrEmpty()) { this.consumerInstance = NetworkUtil.getHostname(); } else { this.consumerInstance = busTopicParams.getConsumerInstance(); @@ -312,225 +311,4 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase return fetchLimit; } - /** - * Member variables of this Params class are as follows - * servers DMaaP servers - * topic DMaaP Topic to be monitored - * apiKey DMaaP API Key (optional) - * apiSecret DMaaP API Secret (optional) - * consumerGroup DMaaP Reader Consumer Group - * consumerInstance DMaaP Reader Instance - * fetchTimeout DMaaP fetch timeout - * fetchLimit DMaaP fetch limit - * environment DME2 Environment - * aftEnvironment DME2 AFT Environment - * partner DME2 Partner - * latitude DME2 Latitude - * longitude DME2 Longitude - * additionalProps Additional properties to pass to DME2 - * useHttps does connection use HTTPS? - * allowSelfSignedCerts are self-signed certificates allow - * - */ - public static class BusTopicParams { - - public static TopicParamsBuilder builder() { - return new TopicParamsBuilder(); - } - private List servers; - private String topic; - private String apiKey; - private String apiSecret; - private String consumerGroup; - private String consumerInstance; - private int fetchTimeout; - private int fetchLimit; - private boolean useHttps; - private boolean allowSelfSignedCerts; - - private String userName; - private String password; - private String environment; - private String aftEnvironment; - private String partner; - private String latitude; - private String longitude; - private Map additionalProps; - - public String getUserName() { - return userName; - } - - public String getPassword() { - return password; - } - - public String getEnvironment() { - return environment; - } - - public String getAftEnvironment() { - return aftEnvironment; - } - - public String getPartner() { - return partner; - } - - public String getLatitude() { - return latitude; - } - - public String getLongitude() { - return longitude; - } - - public Map getAdditionalProps() { - return additionalProps; - } - - public List getServers() { - return servers; - } - - public String getTopic() { - return topic; - } - - public String getApiKey() { - return apiKey; - } - - public String getApiSecret() { - return apiSecret; - } - - public String getConsumerGroup() { - return consumerGroup; - } - - public String getConsumerInstance() { - return consumerInstance; - } - - public int getFetchTimeout() { - return fetchTimeout; - } - - public int getFetchLimit() { - return fetchLimit; - } - - public boolean isUseHttps() { - return useHttps; - } - - public boolean isAllowSelfSignedCerts() { - return allowSelfSignedCerts; - } - - - public static class TopicParamsBuilder { - BusTopicParams m = new BusTopicParams(); - - private TopicParamsBuilder() { - } - - public TopicParamsBuilder servers(List servers) { - this.m.servers = servers; - return this; - } - - public TopicParamsBuilder topic(String topic) { - this.m.topic = topic; - return this; - } - - public TopicParamsBuilder apiKey(String apiKey) { - this.m.apiKey = apiKey; - return this; - } - - public TopicParamsBuilder apiSecret(String apiSecret) { - this.m.apiSecret = apiSecret; - return this; - } - - public TopicParamsBuilder consumerGroup(String consumerGroup) { - this.m.consumerGroup = consumerGroup; - return this; - } - - public TopicParamsBuilder consumerInstance(String consumerInstance) { - this.m.consumerInstance = consumerInstance; - return this; - } - - public TopicParamsBuilder fetchTimeout(int fetchTimeout) { - this.m.fetchTimeout = fetchTimeout; - return this; - } - - public TopicParamsBuilder fetchLimit(int fetchLimit) { - this.m.fetchLimit = fetchLimit; - return this; - } - - public TopicParamsBuilder useHttps(boolean useHttps) { - this.m.useHttps = useHttps; - return this; - } - - public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) { - this.m.allowSelfSignedCerts = allowSelfSignedCerts; - return this; - } - - public TopicParamsBuilder userName(String userName) { - this.m.userName = userName; - return this; - } - - public TopicParamsBuilder password(String password) { - this.m.password = password; - return this; - } - - public TopicParamsBuilder environment(String environment) { - this.m.environment = environment; - return this; - } - - public TopicParamsBuilder aftEnvironment(String aftEnvironment) { - this.m.aftEnvironment = aftEnvironment; - return this; - } - - public TopicParamsBuilder partner(String partner) { - this.m.partner = partner; - return this; - } - - public TopicParamsBuilder latitude(String latitude) { - this.m.latitude = latitude; - return this; - } - - public TopicParamsBuilder longitude(String longitude) { - this.m.longitude = longitude; - return this; - } - - public TopicParamsBuilder additionalProps(Map additionalProps) { - this.m.additionalProps = additionalProps; - return this; - } - - public BusTopicParams build() { - return m; - } - - } - - } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index c6bd5568..65f75aa5 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -84,18 +85,52 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource @Override public void init() throws MalformedURLException { if (anyNullOrEmpty(this.userName, this.password)) { - this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, - this.apiSecret, this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, - this.useHttps, this.allowSelfSignedCerts); + this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .apiKey(this.apiKey) + .apiSecret(this.apiSecret) + .consumerGroup(this.consumerGroup) + .consumerInstance(this.consumerInstance) + .fetchTimeout(this.fetchTimeout) + .fetchLimit(this.fetchLimit) + .useHttps(this.useHttps) + .allowSelfSignedCerts(this.allowSelfSignedCerts) + .build()); } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) { - this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, - this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts); + this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .apiKey(this.apiKey) + .apiSecret(this.apiSecret) + .userName(this.userName) + .password(this.password) + .consumerGroup(this.consumerGroup) + .consumerInstance(this.consumerInstance) + .fetchTimeout(this.fetchTimeout) + .fetchLimit(this.fetchLimit) + .useHttps(this.useHttps) + .allowSelfSignedCerts(this.allowSelfSignedCerts) + .build()); } else { - this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey, - this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit, this.environment, this.aftEnvironment, this.partner, - this.latitude, this.longitude, this.additionalProps, this.useHttps); + this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .apiKey(this.apiKey) + .apiSecret(this.apiSecret) + .userName(this.userName) + .password(this.password) + .consumerGroup(this.consumerGroup) + .consumerInstance(this.consumerInstance) + .fetchTimeout(this.fetchTimeout) + .fetchLimit(this.fetchLimit) + .environment(this.environment) + .aftEnvironment(this.aftEnvironment) + .partner(this.partner) + .latitude(this.latitude) + .longitude(this.longitude) + .additionalProps(this.additionalProps) + .useHttps(this.useHttps).build()); } logger.info("{}: INITTED", this); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java index 03273a2b..fb20ccc4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,8 +31,7 @@ import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource { /** - * - * @param busTopicParams Parameters object containing all the required inputs * + * @param busTopicParams Parameters object containing all the required inputs * @throws IllegalArgumentException An invalid parameter passed in */ @@ -50,9 +50,17 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i */ @Override public void init() { - this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, - this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.useHttps, - this.allowSelfSignedCerts); + this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder() + .servers(this.servers) + .topic(this.topic) + .apiKey(this.apiKey) + .apiSecret(this.apiSecret) + .consumerGroup(this.consumerGroup) + .consumerInstance(this.consumerInstance) + .fetchTimeout(this.fetchTimeout) + .fetchLimit(this.fetchLimit) + .useHttps(this.useHttps) + .allowSelfSignedCerts(this.allowSelfSignedCerts).build()); } /** -- cgit 1.2.3-korg