aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java201
1 files changed, 99 insertions, 102 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 636dc6e3..6d34d32b 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,7 +33,6 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
/**
* Wrapper around libraries to consume from message bus
- *
*/
public interface BusConsumer {
@@ -67,7 +66,7 @@ public interface BusConsumer {
/**
* Sets the server-side filter.
- *
+ *
* @param filter new filter value, or {@code null}
* @throws IllegalArgumentException if the consumer cannot be built with the new filter
*/
@@ -116,53 +115,47 @@ public interface BusConsumer {
/**
* Cambria Consumer Wrapper
+ * BusTopicParam object contains the following parameters
+ * servers messaging bus hosts
+ * topic topic
+ * apiKey API Key
+ * apiSecret API Secret
+ * consumerGroup Consumer Group
+ * consumerInstance Consumer Instance
+ * fetchTimeout Fetch Timeout
+ * fetchLimit Fetch Limit
*
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
+ * @param busTopicParams
* @throws GeneralSecurityException
* @throws MalformedURLException
*/
- public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps,
- boolean useSelfSignedCerts) {
- this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout,
- fetchLimit, useHttps, useSelfSignedCerts);
- }
+ public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
- public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
-
- this.fetchTimeout = fetchTimeout;
+ this.fetchTimeout = busTopicParams.getFetchTimeout();
this.builder = new CambriaClientBuilders.ConsumerBuilder();
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
+ builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
+ .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
+ .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
// Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
builder.withSocketTimeout(fetchTimeout + 30000);
- if (useHttps) {
+ if (busTopicParams.isUseHttps()) {
builder.usingHttps();
- if (useSelfSignedCerts) {
+ if (busTopicParams.isAllowSelfSignedCerts()) {
builder.allowSelfSignedCertificates();
}
}
- if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
- builder.authenticatedBy(apiKey, apiSecret);
+ if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
+ builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
}
- if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
- builder.authenticatedByHttp(username, password);
+ if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
+ builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
}
try {
@@ -282,34 +275,36 @@ public interface BusConsumer {
/**
* MR Consumer Wrapper
+ * <p>
+ * servers messaging bus hosts
+ * topic topic
+ * apiKey API Key
+ * apiSecret API Secret
+ * username AAF Login
+ * password AAF Password
+ * consumerGroup Consumer Group
+ * consumerInstance Consumer Instance
+ * fetchTimeout Fetch Timeout
+ * fetchLimit Fetch Limit
*
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param username AAF Login
- * @param password AAF Password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
+ * @param busTopicParams contains above listed attributes
* @throws MalformedURLException
*/
- public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit) throws MalformedURLException {
+ public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
- this.fetchTimeout = fetchTimeout;
+ this.fetchTimeout = busTopicParams.getFetchTimeout();
- if (topic == null || topic.isEmpty()) {
+ if (busTopicParams.isTopicNullOrEmpty()) {
throw new IllegalArgumentException("No topic for DMaaP");
}
- this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout,
- fetchLimit, null, apiKey, apiSecret);
+ this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(),
+ busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(),
+ busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null,
+ busTopicParams.getApiKey(), busTopicParams.getApiSecret());
- this.consumer.setUsername(username);
- this.consumer.setPassword(password);
+ this.consumer.setUsername(busTopicParams.getUserName());
+ this.consumer.setPassword(busTopicParams.getPassword());
}
@Override
@@ -374,29 +369,29 @@ public interface BusConsumer {
private final Properties props;
/**
+ * BusTopicParams contain the following parameters
* MR Consumer Wrapper
+ * <p>
+ * servers messaging bus hosts
+ * topic topic
+ * apiKey API Key
+ * apiSecret API Secret
+ * aafLogin AAF Login
+ * aafPassword AAF Password
+ * consumerGroup Consumer Group
+ * consumerInstance Consumer Instance
+ * fetchTimeout Fetch Timeout
+ * fetchLimit Fetch Limit
*
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param aafLogin AAF Login
- * @param aafPassword AAF Password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
+ * @param busTopicParams contains above listed params
* @throws MalformedURLException
*/
- public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String aafLogin, String aafPassword, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit, boolean useHttps) throws MalformedURLException {
+ public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
- super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit);
+ super(busTopicParams);
// super constructor sets servers = {""} if empty to avoid errors when using DME2
- if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) || (servers.isEmpty())) {
+ if (busTopicParams.isServersNullOrEmpty()) {
throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
}
@@ -404,13 +399,13 @@ public interface BusConsumer {
props = new Properties();
- if (useHttps) {
+ if (busTopicParams.isUseHttps()) {
props.setProperty(PROTOCOL_PROP, "https");
- this.consumer.setHost(servers.get(0) + ":3905");
+ this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
} else {
props.setProperty(PROTOCOL_PROP, "http");
- this.consumer.setHost(servers.get(0) + ":3904");
+ this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
}
this.consumer.setProps(props);
@@ -434,70 +429,72 @@ public interface BusConsumer {
private final Properties props;
- public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
- String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout,
- int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude,
- String longitude, Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException {
-
+ public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
- super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit);
+ super(busTopicParams);
- final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+ final String dme2RouteOffer = busTopicParams.getAdditionalProps()
+ .get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
- if (environment == null || environment.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+ if (busTopicParams.isEnvironmentNullOrEmpty()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
}
- if (aftEnvironment == null || aftEnvironment.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+ if (busTopicParams.isAftEnvironmentNullOrEmpty()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
}
- if (latitude == null || latitude.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+ if (busTopicParams.isLatitudeNullOrEmpty()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
}
- if (longitude == null || longitude.isEmpty()) {
- throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+ if (busTopicParams.isLongitudeNullOrEmpty()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
}
- if ((dme2Partner == null || dme2Partner.isEmpty())
+ if ((busTopicParams.isPartnerNullOrEmpty())
&& (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+ + "." + busTopicParams.getTopic()
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + busTopicParams.getTopic()
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
}
- final String serviceName = servers.get(0);
+ final String serviceName = busTopicParams.getServers().get(0);
this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
- this.consumer.setUsername(dme2Login);
- this.consumer.setPassword(dme2Password);
+ this.consumer.setUsername(busTopicParams.getUserName());
+ this.consumer.setPassword(busTopicParams.getPassword());
props = new Properties();
props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
- props.setProperty("username", dme2Login);
- props.setProperty("password", dme2Password);
+ props.setProperty("username", busTopicParams.getUserName());
+ props.setProperty("password", busTopicParams.getPassword());
/* These are required, no defaults */
- props.setProperty("topic", topic);
+ props.setProperty("topic", busTopicParams.getTopic());
- props.setProperty("Environment", environment);
- props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+ props.setProperty("Environment", busTopicParams.getEnvironment());
+ props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
- if (dme2Partner != null) {
- props.setProperty("Partner", dme2Partner);
+ if (busTopicParams.getPartner() != null) {
+ props.setProperty("Partner", busTopicParams.getPartner());
}
if (dme2RouteOffer != null) {
props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
}
- props.setProperty("Latitude", latitude);
- props.setProperty("Longitude", longitude);
+ props.setProperty("Latitude", busTopicParams.getLatitude());
+ props.setProperty("Longitude", busTopicParams.getLongitude());
/* These are optional, will default to these values if not set in additionalProps */
props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
@@ -511,7 +508,7 @@ public interface BusConsumer {
props.setProperty("TransportType", "DME2");
props.setProperty("MethodType", "GET");
- if (useHttps) {
+ if (busTopicParams.isUseHttps()) {
props.setProperty(PROTOCOL_PROP, "https");
} else {
@@ -520,8 +517,8 @@ public interface BusConsumer {
props.setProperty("contenttype", "application/json");
- if (additionalProps != null) {
- for (Map.Entry<String, String> entry : additionalProps.entrySet()) {
+ if (busTopicParams.isAdditionalPropsValid()) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
}