summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java3
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java119
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java7
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java13
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java6
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java201
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java88
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java29
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java316
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java26
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java97
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java41
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java230
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java55
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java18
15 files changed, 711 insertions, 538 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
index e7a21ca1..4e2f4ecf 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
@@ -75,7 +75,7 @@ public interface TopicEndpoint extends Startable, Lockable {
/**
* get the Topic Sources for the given topic name
*
- * @param topicName the topic name
+ * @param topicNames the topic name
*
* @return the Topic Source List
* @throws IllegalStateException if the entity is in an invalid state
@@ -150,7 +150,6 @@ public interface TopicEndpoint extends Startable, Lockable {
* infrastructure type
*
* @param topicName the topic name
- * @param commType communication infrastructure type
*
* @return the Topic Sink List
* @throws IllegalStateException if the entity is in an invalid state, for example multiple
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
index 26e8d413..08a1db8f 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
@@ -3,13 +3,14 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.slf4j.Logger;
@@ -47,53 +49,50 @@ public interface DmaapTopicSinkFactory {
/**
* Instantiates a new DMAAP Topic Sink
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName AAF user name
- * @param password AAF password
- * @param partitionKey Consumer Group
- * @param environment DME2 environment
- * @param aftEnvironment DME2 AFT environment
- * @param partner DME2 Partner
- * @param latitude DME2 latitude
- * @param longitude DME2 longitude
+ *
+ * @param servers list of servers
+ * @param topic topic name
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param userName AAF user name
+ * @param password AAF password
+ * @param partitionKey Consumer Group
+ * @param environment DME2 environment
+ * @param aftEnvironment DME2 AFT environment
+ * @param partner DME2 Partner
+ * @param latitude DME2 latitude
+ * @param longitude DME2 longitude
* @param additionalProps additional properties to pass to DME2
- * @param managed is this sink endpoint managed?
- *
+ * @param managed is this sink endpoint managed?
* @return an DMAAP Topic Sink
* @throws IllegalArgumentException if invalid parameters are present
*/
public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, String environment, String aftEnvironment, String partner,
- String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
- boolean allowSelfSignedCerts);
+ String password, String partitionKey, String environment, String aftEnvironment, String partner,
+ String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
+ boolean allowSelfSignedCerts);
/**
* Instantiates a new DMAAP Topic Sink
- *
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param userName AAF user name
- * @param password AAF password
+ *
+ * @param servers list of servers
+ * @param topic topic name
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param userName AAF user name
+ * @param password AAF password
* @param partitionKey Consumer Group
- * @param managed is this sink endpoint managed?
- *
+ * @param managed is this sink endpoint managed?
* @return an DMAAP Topic Sink
* @throws IllegalArgumentException if invalid parameters are present
*/
public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
+ String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
/**
* Creates an DMAAP Topic Sink based on properties files
- *
+ *
* @param properties Properties containing initialization values
- *
* @return an DMAAP Topic Sink
* @throws IllegalArgumentException if invalid parameters are present
*/
@@ -101,10 +100,9 @@ public interface DmaapTopicSinkFactory {
/**
* Instantiates a new DMAAP Topic Sink
- *
+ *
* @param servers list of servers
- * @param topic topic name
- *
+ * @param topic topic name
* @return an DMAAP Topic Sink
* @throws IllegalArgumentException if invalid parameters are present
*/
@@ -112,7 +110,7 @@ public interface DmaapTopicSinkFactory {
/**
* Destroys an DMAAP Topic Sink based on a topic
- *
+ *
* @param topic topic name
* @throws IllegalArgumentException if invalid parameters are present
*/
@@ -120,18 +118,17 @@ public interface DmaapTopicSinkFactory {
/**
* gets an DMAAP Topic Sink based on topic name
- *
+ *
* @param topic the topic name
- *
* @return an DMAAP Topic Sink with topic name
* @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state
+ * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state
*/
public DmaapTopicSink get(String topic);
/**
* Provides a snapshot of the DMAAP Topic Sinks
- *
+ *
* @return a list of the DMAAP Topic Sinks
*/
public List<DmaapTopicSink> inventory();
@@ -163,9 +160,9 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
@Override
public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, String environment, String aftEnvironment, String partner,
- String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
- boolean allowSelfSignedCerts) {
+ String password, String partitionKey, String environment, String aftEnvironment, String partner,
+ String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
+ boolean allowSelfSignedCerts) {
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException(MISSING_TOPIC);
@@ -176,9 +173,23 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
return dmaapTopicWriters.get(topic);
}
- DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName,
- password, partitionKey, environment, aftEnvironment, partner, latitude, longitude, additionalProps,
- useHttps, allowSelfSignedCerts);
+ DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
+ .servers(servers)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .userName(userName)
+ .password(password)
+ .partitionId(partitionKey)
+ .environment(environment)
+ .aftEnvironment(aftEnvironment)
+ .partner(partner)
+ .latitude(latitude)
+ .longitude(longitude)
+ .additionalProps(additionalProps)
+ .useHttps(useHttps)
+ .allowSelfSignedCerts(allowSelfSignedCerts)
+ .build());
if (managed) {
dmaapTopicWriters.put(topic, dmaapTopicSink);
@@ -189,7 +200,8 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
@Override
public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
+ String password, String partitionKey, boolean managed, boolean useHttps,
+ boolean allowSelfSignedCerts) {
if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException(MISSING_TOPIC);
@@ -200,8 +212,17 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
return dmaapTopicWriters.get(topic);
}
- DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName,
- password, partitionKey, useHttps, allowSelfSignedCerts);
+ DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
+ .servers(servers)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .userName(userName)
+ .password(password)
+ .partitionId(partitionKey)
+ .useHttps(useHttps)
+ .allowSelfSignedCerts(allowSelfSignedCerts)
+ .build());
if (managed) {
dmaapTopicWriters.put(topic, dmaapTopicSink);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
index 4285b3a9..11dfd292 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,7 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.slf4j.Logger;
@@ -204,7 +205,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
return dmaapTopicSources.get(topic);
}
- DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
+ DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
.servers(servers)
.topic(topic)
.apiKey(apiKey)
@@ -255,7 +256,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
}
DmaapTopicSource dmaapTopicSource =
- new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
+ new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
.servers(servers)
.topic(topic)
.apiKey(apiKey)
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java
index a522e2c5..9d1bd8ad 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Properties;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.slf4j.Logger;
@@ -141,8 +143,15 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
return uebTopicSinks.get(topic);
}
- UebTopicSink uebTopicWriter = new InlineUebTopicSink(servers, topic, apiKey, apiSecret, partitionKey,
- useHttps, allowSelfSignedCerts);
+ UebTopicSink uebTopicWriter = new InlineUebTopicSink(BusTopicParams.builder()
+ .servers(servers)
+ .topic(topic)
+ .apiKey(apiKey)
+ .apiSecret(apiSecret)
+ .partitionId(partitionKey)
+ .useHttps(useHttps)
+ .allowSelfSignedCerts(allowSelfSignedCerts)
+ .build());
if (managed) {
uebTopicSinks.put(topic, uebTopicWriter);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
index 4c3cbbf8..8d3f28e9 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,7 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Properties;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.slf4j.Logger;
@@ -161,7 +162,7 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
return uebTopicSources.get(topic);
}
- UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
+ UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(BusTopicParams.builder()
.servers(servers)
.topic(topic)
.apiKey(apiKey)
@@ -366,5 +367,4 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
builder.append("IndexedUebTopicSourceFactory []");
return builder.toString();
}
-
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 636dc6e3..6d34d32b 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,7 +33,6 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
/**
* Wrapper around libraries to consume from message bus
- *
*/
public interface BusConsumer {
@@ -67,7 +66,7 @@ public interface BusConsumer {
/**
* Sets the server-side filter.
- *
+ *
* @param filter new filter value, or {@code null}
* @throws IllegalArgumentException if the consumer cannot be built with the new filter
*/
@@ -116,53 +115,47 @@ public interface BusConsumer {
/**
* Cambria Consumer Wrapper
+ * BusTopicParam object contains the following parameters
+ * servers messaging bus hosts
+ * topic topic
+ * apiKey API Key
+ * apiSecret API Secret
+ * consumerGroup Consumer Group
+ * consumerInstance Consumer Instance
+ * fetchTimeout Fetch Timeout
+ * fetchLimit Fetch Limit
*
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
+ * @param busTopicParams
* @throws GeneralSecurityException
* @throws MalformedURLException
*/
- public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps,
- boolean useSelfSignedCerts) {
- this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout,
- fetchLimit, useHttps, useSelfSignedCerts);
- }
+ public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
- public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
-
- this.fetchTimeout = fetchTimeout;
+ this.fetchTimeout = busTopicParams.getFetchTimeout();
this.builder = new CambriaClientBuilders.ConsumerBuilder();
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
+ builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
+ .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
+ .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
// Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
builder.withSocketTimeout(fetchTimeout + 30000);
- if (useHttps) {
+ if (busTopicParams.isUseHttps()) {
builder.usingHttps();
- if (useSelfSignedCerts) {
+ if (busTopicParams.isAllowSelfSignedCerts()) {
builder.allowSelfSignedCertificates();
}
}
- if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
- builder.authenticatedBy(apiKey, apiSecret);
+ if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
+ builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
}
- if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
- builder.authenticatedByHttp(username, password);
+ if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
+ builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
}
try {
@@ -282,34 +275,36 @@ public interface BusConsumer {
/**
* MR Consumer Wrapper
+ * <p>
+ * servers messaging bus hosts
+ * topic topic
+ * apiKey API Key
+ * apiSecret API Secret
+ * username AAF Login
+ * password AAF Password
+ * consumerGroup Consumer Group
+ * consumerInstance Consumer Instance
+ * fetchTimeout Fetch Timeout
+ * fetchLimit Fetch Limit
*
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param username AAF Login
- * @param password AAF Password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
+ * @param busTopicParams contains above listed attributes
* @throws MalformedURLException
*/
- public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit) throws MalformedURLException {
+ public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
- this.fetchTimeout = fetchTimeout;
+ this.fetchTimeout = busTopicParams.getFetchTimeout();
- if (topic == null || topic.isEmpty()) {
+ if (busTopicParams.isTopicNullOrEmpty()) {
throw new IllegalArgumentException("No topic for DMaaP");
}
- this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout,
- fetchLimit, null, apiKey, apiSecret);
+ this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(),
+ busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(),
+ busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null,
+ busTopicParams.getApiKey(), busTopicParams.getApiSecret());
- this.consumer.setUsername(username);
- this.consumer.setPassword(password);
+ this.consumer.setUsername(busTopicParams.getUserName());
+ this.consumer.setPassword(busTopicParams.getPassword());
}
@Override
@@ -374,29 +369,29 @@ public interface BusConsumer {
private final Properties props;
/**
+ * BusTopicParams contain the following parameters
* MR Consumer Wrapper
+ * <p>
+ * servers messaging bus hosts
+ * topic topic
+ * apiKey API Key
+ * apiSecret API Secret
+ * aafLogin AAF Login
+ * aafPassword AAF Password
+ * consumerGroup Consumer Group
+ * consumerInstance Consumer Instance
+ * fetchTimeout Fetch Timeout
+ * fetchLimit Fetch Limit
*
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param aafLogin AAF Login
- * @param aafPassword AAF Password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
+ * @param busTopicParams contains above listed params
* @throws MalformedURLException
*/
- public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String aafLogin, String aafPassword, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit, boolean useHttps) throws MalformedURLException {
+ public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
- super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit);
+ super(busTopicParams);
// super constructor sets servers = {""} if empty to avoid errors when using DME2
- if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) || (servers.isEmpty())) {
+ if (busTopicParams.isServersNullOrEmpty()) {
throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
}
@@ -404,13 +399,13 @@ public interface BusConsumer {
props = new Properties();
- if (useHttps) {
+ if (busTopicParams.isUseHttps()) {
props.setProperty(PROTOCOL_PROP, "https");
- this.consumer.setHost(servers.get(0) + ":3905");
+ this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
} else {
props.setProperty(PROTOCOL_PROP, "http");
- this.consumer.setHost(servers.get(0) + ":3904");
+ this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
}
this.consumer.setProps(props);
@@ -434,70 +429,72 @@ public interface BusConsumer {
private final Properties props;
- public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude,
- String longitude, Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException {
-
+ public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
- super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit);
+ super(busTopicParams);
- final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+ final String dme2RouteOffer = busTopicParams.getAdditionalProps()
+ .get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
- if (environment == null || environment.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+ if (busTopicParams.isEnvironmentNullOrEmpty()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
}
- if (aftEnvironment == null || aftEnvironment.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+ if (busTopicParams.isAftEnvironmentNullOrEmpty()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
}
- if (latitude == null || latitude.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+ if (busTopicParams.isLatitudeNullOrEmpty()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
}
- if (longitude == null || longitude.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+ if (busTopicParams.isLongitudeNullOrEmpty()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
}
- if ((dme2Partner == null || dme2Partner.isEmpty())
+ if ((busTopicParams.isPartnerNullOrEmpty())
&& (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+ + "." + busTopicParams.getTopic()
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + busTopicParams.getTopic()
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
}
- final String serviceName = servers.get(0);
+ final String serviceName = busTopicParams.getServers().get(0);
this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
- this.consumer.setUsername(dme2Login);
- this.consumer.setPassword(dme2Password);
+ this.consumer.setUsername(busTopicParams.getUserName());
+ this.consumer.setPassword(busTopicParams.getPassword());
props = new Properties();
props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
- props.setProperty("username", dme2Login);
- props.setProperty("password", dme2Password);
+ props.setProperty("username", busTopicParams.getUserName());
+ props.setProperty("password", busTopicParams.getPassword());
/* These are required, no defaults */
- props.setProperty("topic", topic);
+ props.setProperty("topic", busTopicParams.getTopic());
- props.setProperty("Environment", environment);
- props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+ props.setProperty("Environment", busTopicParams.getEnvironment());
+ props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
- if (dme2Partner != null) {
- props.setProperty("Partner", dme2Partner);
+ if (busTopicParams.getPartner() != null) {
+ props.setProperty("Partner", busTopicParams.getPartner());
}
if (dme2RouteOffer != null) {
props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
}
- props.setProperty("Latitude", latitude);
- props.setProperty("Longitude", longitude);
+ props.setProperty("Latitude", busTopicParams.getLatitude());
+ props.setProperty("Longitude", busTopicParams.getLongitude());
/* These are optional, will default to these values if not set in additionalProps */
props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
@@ -511,7 +508,7 @@ public interface BusConsumer {
props.setProperty("TransportType", "DME2");
props.setProperty("MethodType", "GET");
- if (useHttps) {
+ if (busTopicParams.isUseHttps()) {
props.setProperty(PROTOCOL_PROP, "https");
} else {
@@ -520,8 +517,8 @@ public interface BusConsumer {
props.setProperty("contenttype", "application/json");
- if (additionalProps != null) {
- for (Map.Entry<String, String> entry : additionalProps.entrySet()) {
+ if (busTopicParams.isAdditionalPropsValid()) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index 9db9131c..348100ab 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -47,7 +48,7 @@ public interface BusPublisher {
/**
* sends a message
*
- * @param partition id
+ * @param partitionId id
* @param message the message
* @return true if success, false otherwise
* @throws IllegalArgumentException if no message provided
@@ -72,23 +73,17 @@ public interface BusPublisher {
@JsonIgnore
protected volatile CambriaBatchingPublisher publisher;
- public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- boolean useHttps) {
- this(servers, topic, apiKey, apiSecret, null, null, useHttps, false);
- }
-
- public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String username, String password, boolean useHttps, boolean selfSignedCerts) {
+ public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
- builder.usingHosts(servers).onTopic(topic);
+ builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
// Set read timeout to 30 seconds (TBD: this should be configurable)
builder.withSocketTimeout(30000);
- if (useHttps) {
- if (selfSignedCerts) {
+ if (busTopicParams.isUseHttps()) {
+ if (busTopicParams.isAllowSelfSignedCerts()) {
builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
} else {
builder.withConnectionType(ConnectionType.HTTPS);
@@ -96,12 +91,12 @@ public interface BusPublisher {
}
- if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
- builder.authenticatedBy(apiKey, apiSecret);
+ if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
+ builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
}
- if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
- builder.authenticatedByHttp(username, password);
+ if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
+ builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
}
try {
@@ -297,55 +292,60 @@ public interface BusPublisher {
}
public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
- public DmaapDmePublisherWrapper(List<String> servers, String topic, String username, String password,
- String environment, String aftEnvironment, String dme2Partner, String latitude, String longitude,
- Map<String, String> additionalProps, boolean useHttps) {
-
- super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
-
-
-
- String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+ public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
+
+ super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(),
+ busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps());
+ String dme2RouteOffer = null;
+ if (busTopicParams.isAdditionalPropsValid()) {
+ dme2RouteOffer = busTopicParams.getAdditionalProps().get(
+ DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+ }
- if (environment == null || environment.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+ if (busTopicParams.isEnvironmentNullOrEmpty()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
}
- if (aftEnvironment == null || aftEnvironment.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+ if (busTopicParams.isAftEnvironmentNullOrEmpty()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
}
- if (latitude == null || latitude.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+ if (busTopicParams.isLatitudeNullOrEmpty()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
}
- if (longitude == null || longitude.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+ if (busTopicParams.isLongitudeNullOrEmpty()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
}
- if ((dme2Partner == null || dme2Partner.isEmpty())
- && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
+ if ((busTopicParams.isPartnerNullOrEmpty())
+ && (dme2RouteOffer == null || dme2RouteOffer.trim().isEmpty())) {
throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + busTopicParams.getTopic()
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
}
- String serviceName = servers.get(0);
+ String serviceName = busTopicParams.getServers().get(0);
/* These are required, no defaults */
- props.setProperty("Environment", environment);
- props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+ props.setProperty("Environment", busTopicParams.getEnvironment());
+ props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
- if (dme2Partner != null) {
- props.setProperty("Partner", dme2Partner);
+ if (busTopicParams.getPartner() != null) {
+ props.setProperty("Partner", busTopicParams.getPartner());
}
if (dme2RouteOffer != null) {
props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
}
- props.setProperty("Latitude", latitude);
- props.setProperty("Longitude", longitude);
+ props.setProperty("Latitude", busTopicParams.getLatitude());
+ props.setProperty("Longitude", busTopicParams.getLongitude());
// ServiceName also a default, found in additionalProps
@@ -361,7 +361,7 @@ public interface BusPublisher {
props.setProperty("TransportType", "DME2");
props.setProperty("MethodType", "POST");
- for (Map.Entry<String, String> entry : additionalProps.entrySet()) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java
index 7f4c0ddd..08993126 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java
@@ -52,25 +52,22 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled {
/**
* Instantiates a new Bus Topic Base
*
- * @param servers list of servers
- * @param topic topic name
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param useHttps does connection use HTTPS?
- * @param allowSelfSignedCerts are self-signed certificates allow
- *
+ * servers list of servers
+ * topic topic name
+ * apiKey API Key
+ * apiSecret API Secret
+ * useHttps does connection use HTTPS?
+ * allowSelfSignedCerts are self-signed certificates allow
+ * @param busTopicParams
* @return a Bus Topic Base
* @throws IllegalArgumentException if invalid parameters are present
*/
- public BusTopicBase(List<String> servers, String topic, String apiKey, String apiSecret, boolean useHttps,
- boolean allowSelfSignedCerts) {
-
- super(servers, topic);
-
- this.apiKey = apiKey;
- this.apiSecret = apiSecret;
- this.useHttps = useHttps;
- this.allowSelfSignedCerts = allowSelfSignedCerts;
+ public BusTopicBase(BusTopicParams busTopicParams) {
+ super(busTopicParams.getServers(), busTopicParams.getTopic());
+ this.apiKey = busTopicParams.getApiKey();
+ this.apiSecret = busTopicParams.getApiSecret();
+ this.useHttps = busTopicParams.isUseHttps();
+ this.allowSelfSignedCerts = busTopicParams.isAllowSelfSignedCerts();
}
@Override
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
new file mode 100644
index 00000000..ffefcbf2
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
@@ -0,0 +1,316 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Member variables of this Params class are as follows
+ * servers DMaaP servers
+ * topic DMaaP Topic to be monitored
+ * apiKey DMaaP API Key (optional)
+ * apiSecret DMaaP API Secret (optional)
+ * consumerGroup DMaaP Reader Consumer Group
+ * consumerInstance DMaaP Reader Instance
+ * fetchTimeout DMaaP fetch timeout
+ * fetchLimit DMaaP fetch limit
+ * environment DME2 Environment
+ * aftEnvironment DME2 AFT Environment
+ * partner DME2 Partner
+ * latitude DME2 Latitude
+ * longitude DME2 Longitude
+ * additionalProps Additional properties to pass to DME2
+ * useHttps does connection use HTTPS?
+ * allowSelfSignedCerts are self-signed certificates allow
+ */
+public class BusTopicParams {
+
+ public static TopicParamsBuilder builder() {
+ return new TopicParamsBuilder();
+ }
+
+ private List<String> servers;
+ private String topic;
+ private String apiKey;
+ private String apiSecret;
+ private String consumerGroup;
+ private String consumerInstance;
+ private int fetchTimeout;
+ private int fetchLimit;
+ private boolean useHttps;
+ private boolean allowSelfSignedCerts;
+
+ private String userName;
+ private String password;
+ private String environment;
+ private String aftEnvironment;
+ private String partner;
+ private String latitude;
+ private String longitude;
+ private Map<String, String> additionalProps;
+ private String partitionId;
+
+ String getPartitionId() {
+ return partitionId;
+ }
+
+ String getUserName() {
+ return userName;
+ }
+
+ String getPassword() {
+ return password;
+ }
+
+ String getEnvironment() {
+ return environment;
+ }
+
+ String getAftEnvironment() {
+ return aftEnvironment;
+ }
+
+ String getPartner() {
+ return partner;
+ }
+
+ String getLatitude() {
+ return latitude;
+ }
+
+ String getLongitude() {
+ return longitude;
+ }
+
+ Map<String, String> getAdditionalProps() {
+ return additionalProps;
+ }
+
+ List<String> getServers() {
+ return servers;
+ }
+
+ String getTopic() {
+ return topic;
+ }
+
+ String getApiKey() {
+ return apiKey;
+ }
+
+ String getApiSecret() {
+ return apiSecret;
+ }
+
+ String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ String getConsumerInstance() {
+ return consumerInstance;
+ }
+
+ int getFetchTimeout() {
+ return fetchTimeout;
+ }
+
+ int getFetchLimit() {
+ return fetchLimit;
+ }
+
+ boolean isUseHttps() {
+ return useHttps;
+ }
+
+ boolean isAllowSelfSignedCerts() {
+ return allowSelfSignedCerts;
+ }
+
+ boolean isEnvironmentNullOrEmpty() {
+ return (environment == null || environment.trim().isEmpty());
+ }
+
+ boolean isAftEnvironmentNullOrEmpty() {
+ return (aftEnvironment == null || aftEnvironment.trim().isEmpty());
+ }
+
+ boolean isLatitudeNullOrEmpty() {
+ return (latitude == null || latitude.trim().isEmpty());
+ }
+
+ boolean isLongitudeNullOrEmpty() {
+ return (longitude == null || longitude.trim().isEmpty());
+ }
+
+ boolean isConsumerInstanceNullOrEmpty() {
+ return (consumerInstance == null || consumerInstance.trim().isEmpty());
+ }
+
+ boolean isConsumerGroupNullOrEmpty() {
+ return (consumerGroup == null || consumerGroup.trim().isEmpty());
+ }
+
+ boolean isApiKeyValid() {
+ return !(apiKey == null || apiKey.trim().isEmpty());
+ }
+
+ boolean isApiSecretValid() {
+ return !(apiSecret == null || apiSecret.trim().isEmpty());
+ }
+
+ boolean isUserNameValid() {
+ return !(userName == null || userName.trim().isEmpty());
+ }
+
+ boolean isPasswordValid() {
+ return !(password == null || password.trim().isEmpty());
+ }
+
+ boolean isPartnerNullOrEmpty() {
+ return (partner == null || partner.trim().isEmpty());
+ }
+
+ boolean isServersNullOrEmpty() {
+ return (servers == null || servers.isEmpty()
+ || (servers.size() == 1 && ("".equals(servers.get(0)))));
+ }
+
+ boolean isAdditionalPropsValid() {
+ return additionalProps != null;
+ }
+
+ boolean isTopicNullOrEmpty() {
+ return (topic == null || topic.trim().isEmpty());
+ }
+
+ boolean isPartitionIdNullOrEmpty() {
+ return (partitionId == null || partitionId.trim().isEmpty());
+ }
+
+ public static class TopicParamsBuilder {
+ BusTopicParams m = new BusTopicParams();
+
+ private TopicParamsBuilder() {
+ }
+
+ public TopicParamsBuilder servers(List<String> servers) {
+ this.m.servers = servers;
+ return this;
+ }
+
+ public TopicParamsBuilder topic(String topic) {
+ this.m.topic = topic;
+ return this;
+ }
+
+ public TopicParamsBuilder apiKey(String apiKey) {
+ this.m.apiKey = apiKey;
+ return this;
+ }
+
+ public TopicParamsBuilder apiSecret(String apiSecret) {
+ this.m.apiSecret = apiSecret;
+ return this;
+ }
+
+ public TopicParamsBuilder consumerGroup(String consumerGroup) {
+ this.m.consumerGroup = consumerGroup;
+ return this;
+ }
+
+ public TopicParamsBuilder consumerInstance(String consumerInstance) {
+ this.m.consumerInstance = consumerInstance;
+ return this;
+ }
+
+ public TopicParamsBuilder fetchTimeout(int fetchTimeout) {
+ this.m.fetchTimeout = fetchTimeout;
+ return this;
+ }
+
+ public TopicParamsBuilder fetchLimit(int fetchLimit) {
+ this.m.fetchLimit = fetchLimit;
+ return this;
+ }
+
+ public TopicParamsBuilder useHttps(boolean useHttps) {
+ this.m.useHttps = useHttps;
+ return this;
+ }
+
+ public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) {
+ this.m.allowSelfSignedCerts = allowSelfSignedCerts;
+ return this;
+ }
+
+ public TopicParamsBuilder userName(String userName) {
+ this.m.userName = userName;
+ return this;
+ }
+
+ public TopicParamsBuilder password(String password) {
+ this.m.password = password;
+ return this;
+ }
+
+ public TopicParamsBuilder environment(String environment) {
+ this.m.environment = environment;
+ return this;
+ }
+
+ public TopicParamsBuilder aftEnvironment(String aftEnvironment) {
+ this.m.aftEnvironment = aftEnvironment;
+ return this;
+ }
+
+ public TopicParamsBuilder partner(String partner) {
+ this.m.partner = partner;
+ return this;
+ }
+
+ public TopicParamsBuilder latitude(String latitude) {
+ this.m.latitude = latitude;
+ return this;
+ }
+
+ public TopicParamsBuilder longitude(String longitude) {
+ this.m.longitude = longitude;
+ return this;
+ }
+
+ public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) {
+ this.m.additionalProps = additionalProps;
+ return this;
+ }
+
+ public TopicParamsBuilder partitionId(String partitionId) {
+ this.m.partitionId = partitionId;
+ return this;
+ }
+
+ public BusTopicParams build() {
+ return m;
+ }
+
+ }
+}
+
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
index f3c736da..5493468a 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -52,23 +53,24 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
/**
* constructor for abstract sink
- *
- * @param servers servers
- * @param topic topic
- * @param apiKey api secret
- * @param apiSecret api secret
- * @param partitionId partition id
- * @param useHttps does connection use HTTPS?
- * @param allowSelfSignedCerts are self-signed certificates allow
+ * @param busTopicParams contains below listed attributes
+ * servers servers
+ * topic topic
+ * apiKey api secret
+ * apiSecret api secret
+ * partitionId partition id
+ * useHttps does connection use HTTPS?
+ * allowSelfSignedCerts are self-signed certificates allow *
* @throws IllegalArgumentException in invalid parameters are passed in
*/
- public InlineBusTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String partitionId,
- boolean useHttps, boolean allowSelfSignedCerts) {
+ public InlineBusTopicSink(BusTopicParams busTopicParams) {
- super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
+ super(busTopicParams);
- if (partitionId == null || partitionId.isEmpty()) {
+ if (busTopicParams.isPartitionIdNullOrEmpty()) {
this.partitionId = UUID.randomUUID().toString();
+ } else {
+ this.partitionId = busTopicParams.getPartitionId();
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java
index 3ea7185e..3dd40312 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java
@@ -3,13 +3,14 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -48,65 +49,67 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
protected Map<String, String> additionalProps = null;
/**
- *
- * @param servers DMaaP servers
- * @param topic DMaaP Topic to be monitored
- * @param apiKey DMaaP API Key (optional)
- * @param apiSecret DMaaP API Secret (optional)
- * @param consumerGroup DMaaP Reader Consumer Group
- * @param consumerInstance DMaaP Reader Instance
- * @param fetchTimeout DMaaP fetch timeout
- * @param fetchLimit DMaaP fetch limit
- * @param environment DME2 Environment
- * @param aftEnvironment DME2 AFT Environment
- * @param partner DME2 Partner
- * @param latitude DME2 Latitude
- * @param longitude DME2 Longitude
- * @param additionalProps Additional properties to pass to DME2
- * @param useHttps does connection use HTTPS?
- * @param allowSelfSignedCerts are self-signed certificates allow
- *
+ * BusTopicParams contains the below mentioned attributes
+ * servers DMaaP servers
+ * topic DMaaP Topic to be monitored
+ * apiKey DMaaP API Key (optional)
+ * apiSecret DMaaP API Secret (optional)
+ * environment DME2 Environment
+ * aftEnvironment DME2 AFT Environment
+ * partner DME2 Partner
+ * latitude DME2 Latitude
+ * longitude DME2 Longitude
+ * additionalProps Additional properties to pass to DME2
+ * useHttps does connection use HTTPS?
+ * allowSelfSignedCerts are self-signed certificates allow
+ * @param busTopicParams Contains the above mentioned parameters
* @throws IllegalArgumentException An invalid parameter passed in
*/
- public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, String environment, String aftEnvironment, String partner,
- String latitude, String longitude, Map<String, String> additionalProps, boolean useHttps,
- boolean allowSelfSignedCerts) {
+ public InlineDmaapTopicSink(BusTopicParams busTopicParams) {
- super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
+ super(busTopicParams);
- this.userName = userName;
- this.password = password;
+ this.userName = busTopicParams.getUserName();
+ this.password = busTopicParams.getPassword();
- this.environment = environment;
- this.aftEnvironment = aftEnvironment;
- this.partner = partner;
+ this.environment = busTopicParams.getEnvironment();
+ this.aftEnvironment = busTopicParams.getAftEnvironment();
+ this.partner = busTopicParams.getPartner();
- this.latitude = latitude;
- this.longitude = longitude;
+ this.latitude = busTopicParams.getLatitude();
+ this.longitude = busTopicParams.getLongitude();
- this.additionalProps = additionalProps;
- }
-
- public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
- String password, String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) {
-
- super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
-
- this.userName = userName;
- this.password = password;
+ this.additionalProps = busTopicParams.getAdditionalProps();
}
@Override
public void init() {
if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
- this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey,
- this.apiSecret, this.userName, this.password, this.useHttps, this.allowSelfSignedCerts);
+ this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder()
+ .servers(this.servers)
+ .topic(this.topic)
+ .apiKey(this.apiKey)
+ .apiSecret(this.apiSecret)
+ .userName(this.userName)
+ .password(this.password)
+ .useHttps(this.useHttps)
+ .allowSelfSignedCerts(this.allowSelfSignedCerts)
+ .build());
} else {
- this.publisher = new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, this.userName,
- this.password, this.environment, this.aftEnvironment, this.partner, this.latitude, this.longitude,
- this.additionalProps, this.useHttps);
+ this.publisher = new BusPublisher.DmaapDmePublisherWrapper(BusTopicParams.builder()
+ .servers(this.servers)
+ .topic(this.topic)
+ .userName(this.userName)
+ .password(this.password)
+ .environment(this.environment)
+ .aftEnvironment(this.aftEnvironment)
+ .partner(this.partner)
+ .latitude(this.latitude)
+ .longitude(this.longitude)
+ .additionalProps(this.additionalProps)
+ .useHttps(this.useHttps)
+ .build());
}
logger.info("{}: DMAAP SINK created", this);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java
index fefe6493..218e44b4 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java
@@ -3,13 +3,14 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -39,21 +40,21 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class);
/**
- * Argument-based UEB Topic Writer instantiation
- *
- * @param servers list of UEB servers available for publishing
- * @param topic the topic to publish to
- * @param apiKey the api key (optional)
- * @param apiSecret the api secret (optional)
- * @param partitionId the partition key (optional, autogenerated if not provided)
- * @param useHttps does connection use HTTPS?
- * @param allowSelfSignedCerts are self-signed certificates allow
- *
+ * Argument-based UEB Topic Writer instantiation. BusTopicParams contains below mentioned
+ * attributes
+ *
+ * servers list of UEB servers available for publishing
+ * topic the topic to publish to
+ * apiKey the api key (optional)
+ * apiSecret the api secret (optional)
+ * partitionId the partition key (optional, autogenerated if not provided)
+ * useHttps does connection use HTTPS?
+ * allowSelfSignedCerts are self-signed certificates allow
+ * @param busTopicParams contains attributes needed
* @throws IllegalArgumentException if invalid arguments are detected
*/
- public InlineUebTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String partitionId,
- boolean useHttps, boolean allowSelfSignedCerts) {
- super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts);
+ public InlineUebTopicSink(BusTopicParams busTopicParams) {
+ super(busTopicParams);
}
/**
@@ -62,8 +63,14 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
@Override
public void init() {
- this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
- null, null, this.useHttps, this.allowSelfSignedCerts);
+ this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder()
+ .servers(this.servers)
+ .topic(this.topic)
+ .apiKey(this.apiKey)
+ .apiSecret(this.apiSecret)
+ .useHttps(this.useHttps)
+ .allowSelfSignedCerts(this.allowSelfSignedCerts)
+ .build());
logger.info("{}: UEB SINK created", this);
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index 74912cae..400cbfe2 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,8 +22,6 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
import java.net.MalformedURLException;
-import java.util.List;
-import java.util.Map;
import java.util.UUID;
import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
@@ -85,15 +84,15 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
*/
public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
- super(busTopicParams.getServers(), busTopicParams.getTopic(), busTopicParams.getApiKey(), busTopicParams.getApiSecret(), busTopicParams.isUseHttps(), busTopicParams.isAllowSelfSignedCerts());
+ super(busTopicParams);
- if (busTopicParams.getConsumerGroup() == null || busTopicParams.getConsumerGroup().isEmpty()) {
+ if (busTopicParams.isConsumerGroupNullOrEmpty()) {
this.consumerGroup = UUID.randomUUID().toString();
} else {
this.consumerGroup = busTopicParams.getConsumerGroup();
}
- if (busTopicParams.getConsumerInstance() == null || busTopicParams.getConsumerInstance().isEmpty()) {
+ if (busTopicParams.isConsumerInstanceNullOrEmpty()) {
this.consumerInstance = NetworkUtil.getHostname();
} else {
this.consumerInstance = busTopicParams.getConsumerInstance();
@@ -312,225 +311,4 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
return fetchLimit;
}
- /**
- * Member variables of this Params class are as follows
- * servers DMaaP servers
- * topic DMaaP Topic to be monitored
- * apiKey DMaaP API Key (optional)
- * apiSecret DMaaP API Secret (optional)
- * consumerGroup DMaaP Reader Consumer Group
- * consumerInstance DMaaP Reader Instance
- * fetchTimeout DMaaP fetch timeout
- * fetchLimit DMaaP fetch limit
- * environment DME2 Environment
- * aftEnvironment DME2 AFT Environment
- * partner DME2 Partner
- * latitude DME2 Latitude
- * longitude DME2 Longitude
- * additionalProps Additional properties to pass to DME2
- * useHttps does connection use HTTPS?
- * allowSelfSignedCerts are self-signed certificates allow
- *
- */
- public static class BusTopicParams {
-
- public static TopicParamsBuilder builder() {
- return new TopicParamsBuilder();
- }
- private List<String> servers;
- private String topic;
- private String apiKey;
- private String apiSecret;
- private String consumerGroup;
- private String consumerInstance;
- private int fetchTimeout;
- private int fetchLimit;
- private boolean useHttps;
- private boolean allowSelfSignedCerts;
-
- private String userName;
- private String password;
- private String environment;
- private String aftEnvironment;
- private String partner;
- private String latitude;
- private String longitude;
- private Map<String, String> additionalProps;
-
- public String getUserName() {
- return userName;
- }
-
- public String getPassword() {
- return password;
- }
-
- public String getEnvironment() {
- return environment;
- }
-
- public String getAftEnvironment() {
- return aftEnvironment;
- }
-
- public String getPartner() {
- return partner;
- }
-
- public String getLatitude() {
- return latitude;
- }
-
- public String getLongitude() {
- return longitude;
- }
-
- public Map<String, String> getAdditionalProps() {
- return additionalProps;
- }
-
- public List<String> getServers() {
- return servers;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public String getApiKey() {
- return apiKey;
- }
-
- public String getApiSecret() {
- return apiSecret;
- }
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- public String getConsumerInstance() {
- return consumerInstance;
- }
-
- public int getFetchTimeout() {
- return fetchTimeout;
- }
-
- public int getFetchLimit() {
- return fetchLimit;
- }
-
- public boolean isUseHttps() {
- return useHttps;
- }
-
- public boolean isAllowSelfSignedCerts() {
- return allowSelfSignedCerts;
- }
-
-
- public static class TopicParamsBuilder {
- BusTopicParams m = new BusTopicParams();
-
- private TopicParamsBuilder() {
- }
-
- public TopicParamsBuilder servers(List<String> servers) {
- this.m.servers = servers;
- return this;
- }
-
- public TopicParamsBuilder topic(String topic) {
- this.m.topic = topic;
- return this;
- }
-
- public TopicParamsBuilder apiKey(String apiKey) {
- this.m.apiKey = apiKey;
- return this;
- }
-
- public TopicParamsBuilder apiSecret(String apiSecret) {
- this.m.apiSecret = apiSecret;
- return this;
- }
-
- public TopicParamsBuilder consumerGroup(String consumerGroup) {
- this.m.consumerGroup = consumerGroup;
- return this;
- }
-
- public TopicParamsBuilder consumerInstance(String consumerInstance) {
- this.m.consumerInstance = consumerInstance;
- return this;
- }
-
- public TopicParamsBuilder fetchTimeout(int fetchTimeout) {
- this.m.fetchTimeout = fetchTimeout;
- return this;
- }
-
- public TopicParamsBuilder fetchLimit(int fetchLimit) {
- this.m.fetchLimit = fetchLimit;
- return this;
- }
-
- public TopicParamsBuilder useHttps(boolean useHttps) {
- this.m.useHttps = useHttps;
- return this;
- }
-
- public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) {
- this.m.allowSelfSignedCerts = allowSelfSignedCerts;
- return this;
- }
-
- public TopicParamsBuilder userName(String userName) {
- this.m.userName = userName;
- return this;
- }
-
- public TopicParamsBuilder password(String password) {
- this.m.password = password;
- return this;
- }
-
- public TopicParamsBuilder environment(String environment) {
- this.m.environment = environment;
- return this;
- }
-
- public TopicParamsBuilder aftEnvironment(String aftEnvironment) {
- this.m.aftEnvironment = aftEnvironment;
- return this;
- }
-
- public TopicParamsBuilder partner(String partner) {
- this.m.partner = partner;
- return this;
- }
-
- public TopicParamsBuilder latitude(String latitude) {
- this.m.latitude = latitude;
- return this;
- }
-
- public TopicParamsBuilder longitude(String longitude) {
- this.m.longitude = longitude;
- return this;
- }
-
- public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) {
- this.m.additionalProps = additionalProps;
- return this;
- }
-
- public BusTopicParams build() {
- return m;
- }
-
- }
-
- }
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
index c6bd5568..65f75aa5 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -84,18 +85,52 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
@Override
public void init() throws MalformedURLException {
if (anyNullOrEmpty(this.userName, this.password)) {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey,
- this.apiSecret, this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit,
- this.useHttps, this.allowSelfSignedCerts);
+ this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
+ .servers(this.servers)
+ .topic(this.topic)
+ .apiKey(this.apiKey)
+ .apiSecret(this.apiSecret)
+ .consumerGroup(this.consumerGroup)
+ .consumerInstance(this.consumerInstance)
+ .fetchTimeout(this.fetchTimeout)
+ .fetchLimit(this.fetchLimit)
+ .useHttps(this.useHttps)
+ .allowSelfSignedCerts(this.allowSelfSignedCerts)
+ .build());
} else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey,
- this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts);
+ this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
+ .servers(this.servers)
+ .topic(this.topic)
+ .apiKey(this.apiKey)
+ .apiSecret(this.apiSecret)
+ .userName(this.userName)
+ .password(this.password)
+ .consumerGroup(this.consumerGroup)
+ .consumerInstance(this.consumerInstance)
+ .fetchTimeout(this.fetchTimeout)
+ .fetchLimit(this.fetchLimit)
+ .useHttps(this.useHttps)
+ .allowSelfSignedCerts(this.allowSelfSignedCerts)
+ .build());
} else {
- this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey,
- this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit, this.environment, this.aftEnvironment, this.partner,
- this.latitude, this.longitude, this.additionalProps, this.useHttps);
+ this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(BusTopicParams.builder()
+ .servers(this.servers)
+ .topic(this.topic)
+ .apiKey(this.apiKey)
+ .apiSecret(this.apiSecret)
+ .userName(this.userName)
+ .password(this.password)
+ .consumerGroup(this.consumerGroup)
+ .consumerInstance(this.consumerInstance)
+ .fetchTimeout(this.fetchTimeout)
+ .fetchLimit(this.fetchLimit)
+ .environment(this.environment)
+ .aftEnvironment(this.aftEnvironment)
+ .partner(this.partner)
+ .latitude(this.latitude)
+ .longitude(this.longitude)
+ .additionalProps(this.additionalProps)
+ .useHttps(this.useHttps).build());
}
logger.info("{}: INITTED", this);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
index 03273a2b..fb20ccc4 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,8 +31,7 @@ import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource {
/**
- *
- * @param busTopicParams Parameters object containing all the required inputs *
+ * @param busTopicParams Parameters object containing all the required inputs
* @throws IllegalArgumentException An invalid parameter passed in
*/
@@ -50,9 +50,17 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i
*/
@Override
public void init() {
- this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
- this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.useHttps,
- this.allowSelfSignedCerts);
+ this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
+ .servers(this.servers)
+ .topic(this.topic)
+ .apiKey(this.apiKey)
+ .apiSecret(this.apiSecret)
+ .consumerGroup(this.consumerGroup)
+ .consumerInstance(this.consumerInstance)
+ .fetchTimeout(this.fetchTimeout)
+ .fetchLimit(this.fetchLimit)
+ .useHttps(this.useHttps)
+ .allowSelfSignedCerts(this.allowSelfSignedCerts).build());
}
/**