summaryrefslogtreecommitdiffstats
path: root/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org
diff options
context:
space:
mode:
authorRyan Young <ry303t@att.com>2018-04-06 17:32:41 -0400
committerPatrick Brady <pb071s@att.com>2018-04-09 17:52:45 +0000
commitb96311a375b16d1c237f8e99b8eca6024638262b (patch)
tree394a32a1106cb7790f10e7ee4aca12b2857e26e1 /appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org
parent674e3b93deddb432513f5860ebf595367f2ea292 (diff)
Enhance DMaaP Adapter Configuration
Change-Id: I5385cf2710fc33a85da9a67d5f4d31dce1e460aa Signed-off-by: Ryan Young <ry303t@att.com> Issue-ID: APPC-658
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org')
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java71
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java70
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapUtil.java114
3 files changed, 165 insertions, 90 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
index 6f907ae20..40ee1c71f 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP : APPC
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Copyright (C) 2017 Amdocs
* =============================================================================
@@ -18,7 +18,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
* ============LICENSE_END=========================================================
*/
@@ -50,27 +49,26 @@ import org.osgi.framework.ServiceReference;
public class DmaapConsumerImpl implements Consumer {
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
- private final Configuration configuration = ConfigurationFactory.getConfiguration();
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
+ private final Configuration configuration = ConfigurationFactory.getConfiguration();
// Default values
- private static final int DEFAULT_TIMEOUT_MS = 60000;
- private static final int DEFAULT_LIMIT = 1000;
- private String topic;
- private boolean isMetricEnabled = false;
- private boolean useHttps = false;
- private MetricRegistry metricRegistry;
- private MRConsumer client = null;
- private Properties props = null;
-
+ private static final int DEFAULT_TIMEOUT_MS = 60000;
+ private static final int DEFAULT_LIMIT = 1000;
+ private String topic;
+ private boolean isMetricEnabled = false;
+ private boolean useHttps = false;
+ private MetricRegistry metricRegistry;
+ private MRConsumer client = null;
+ private Properties props = null;
public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
- String user, String password) {
+ String user, String password) {
- this(urls, topicName, consumerGroupName, consumerId,user, password,null);
+ this(urls, topicName, consumerGroupName, consumerId, user, password, null);
}
public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
- String user, String password, String filter) {
+ String user, String password, String filter) {
this.topic = topicName;
this.props = new Properties();
@@ -78,8 +76,13 @@ public class DmaapConsumerImpl implements Consumer {
props.setProperty("host", urlsStr);
props.setProperty("group", consumerGroupName);
props.setProperty("id", consumerId);
- props.setProperty("username", user);
- props.setProperty("password", password);
+ if (user != null && password != null) {
+ props.setProperty("username", user);
+ props.setProperty("password", password);
+ } else {
+ props.setProperty("TransportType", "HTTPNOAUTH");
+ }
+
if (filter != null) {
props.setProperty("filter", filter);
}
@@ -92,22 +95,17 @@ public class DmaapConsumerImpl implements Consumer {
metricRegistry = metricService.createRegistry("APPC");
DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory()
- .dmaapRequestCounterBuilder()
- .withName("DMAAP_KPI").withType(MetricType.COUNTER)
- .withRecievedMessage(0)
- .withPublishedMessage(0)
- .build();
+ .dmaapRequestCounterBuilder().withName("DMAAP_KPI").withType(MetricType.COUNTER)
+ .withRecievedMessage(0).withPublishedMessage(0).build();
if (metricRegistry.register(dmaapKpiMetric)) {
- Metric[] metrics = new Metric[]{dmaapKpiMetric};
+ Metric[] metrics = new Metric[] { dmaapKpiMetric };
LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
LogPublisher[] logPublishers = new LogPublisher[1];
logPublishers[0] = logPublisher;
PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory()
- .scheduledPolicyBuilder().withPublishers(logPublishers)
- .withMetrics(metrics)
- .build();
+ .scheduledPolicyBuilder().withPublishers(logPublishers).withMetrics(metrics).build();
LOG.debug("Policy getting initialized");
manuallyScheduledPublishingPolicy.init();
@@ -121,12 +119,12 @@ public class DmaapConsumerImpl implements Consumer {
*/
private synchronized MRConsumer getClient(int waitMs, int limit) {
try {
- props.setProperty("timeout",String.valueOf(waitMs));
- props.setProperty("limit",String.valueOf(limit));
- String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props);
+ props.setProperty("timeout", String.valueOf(waitMs));
+ props.setProperty("limit", String.valueOf(limit));
+ String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic, props);
return MRClientFactory.createConsumer(topicProducerPropFileName);
} catch (IOException e1) {
- LOG.error("failed to createConsumer",e1);
+ LOG.error("failed to createConsumer", e1);
return null;
}
}
@@ -134,8 +132,8 @@ public class DmaapConsumerImpl implements Consumer {
@Override
public synchronized void updateCredentials(String key, String secret) {
LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
- props.setProperty("user",String.valueOf(key));
- props.setProperty("password",String.valueOf(secret));
+ props.setProperty("username", String.valueOf(key));
+ props.setProperty("password", String.valueOf(secret));
client = null;
}
@@ -227,4 +225,11 @@ public class DmaapConsumerImpl implements Consumer {
}
}
+ public Properties getProperties() {
+ return props;
+ }
+
+ public boolean isHttps() {
+ return useHttps;
+ }
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
index 3fbfc95c6..3a39a98db 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
@@ -53,21 +53,20 @@ import org.osgi.framework.ServiceReference;
public class DmaapProducerImpl implements Producer {
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class);
- private static final Configuration configuration = ConfigurationFactory.getConfiguration();
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class);
+ private static final Configuration configuration = ConfigurationFactory.getConfiguration();
- private Set<String> topics;
+ private Set<String> topics;
- private Properties props = null;
- private MetricRegistry metricRegistry;
- private boolean useHttps = false;
- private boolean isMetricEnabled = false;
-
- private Set<MRBatchingPublisher> clients;
+ private Properties props = null;
+ private MetricRegistry metricRegistry;
+ private boolean useHttps = false;
+ private boolean isMetricEnabled = false;
+
+ private Set<MRBatchingPublisher> clients;
-
public DmaapProducerImpl(Collection<String> urls, String topicName, String user, String password) {
- this(urls, (Set<String>)null, user, password);
+ this(urls, (Set<String>) null, user, password);
this.topics = new HashSet<>();
if (topicName != null) {
Collections.addAll(topics, topicName.split(","));
@@ -76,15 +75,19 @@ public class DmaapProducerImpl implements Producer {
public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) {
topics = topicNames;
- if (urls == null || user == null || password == null) {
- throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password");
+ if (urls == null) {
+ throw new IllegalArgumentException("Mandaory argument is null: urls");
}
this.props = new Properties();
String urlsStr = StringUtils.join(urls, ',');
- props.setProperty("host",urlsStr);
+ props.setProperty("host", urlsStr);
props.setProperty("id", UUID.randomUUID().toString());
- props.setProperty("username",user);
- props.setProperty("password",password);
+ if (user != null && password != null) {
+ props.setProperty("username", user);
+ props.setProperty("password", password);
+ } else {
+ props.setProperty("TransportType", "HTTPNOAUTH");
+ }
}
private void initMetric() {
@@ -94,23 +97,17 @@ public class DmaapProducerImpl implements Producer {
metricRegistry = metricService.createRegistry("APPC");
DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory()
- .dmaapRequestCounterBuilder()
- .withName("DMAAP_KPI").withType(MetricType.COUNTER)
- .withRecievedMessage(0)
- .withPublishedMessage(0)
- .build();
+ .dmaapRequestCounterBuilder().withName("DMAAP_KPI").withType(MetricType.COUNTER)
+ .withRecievedMessage(0).withPublishedMessage(0).build();
if (metricRegistry.register(dmaapKpiMetric)) {
- Metric[] metrics = new Metric[]{dmaapKpiMetric};
+ Metric[] metrics = new Metric[] { dmaapKpiMetric };
LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
LogPublisher[] logPublishers = new LogPublisher[1];
logPublishers[0] = logPublisher;
PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory()
- .scheduledPolicyBuilder()
- .withPublishers(logPublishers)
- .withMetrics(metrics)
- .build();
+ .scheduledPolicyBuilder().withPublishers(logPublishers).withMetrics(metrics).build();
LOG.debug("Policy getting initialized");
manuallyScheduledPublishingPolicy.init();
@@ -123,7 +120,7 @@ public class DmaapProducerImpl implements Producer {
Set<MRBatchingPublisher> out = new HashSet<>();
for (String topic : topics) {
try {
- String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props);
+ String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic, props);
final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher(topicProducerPropFileName);
out.add(client);
} catch (Exception e) {
@@ -136,7 +133,7 @@ public class DmaapProducerImpl implements Producer {
@Override
public synchronized void updateCredentials(String key, String secret) {
LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
- props.setProperty("user", String.valueOf(key));
+ props.setProperty("username", String.valueOf(key));
props.setProperty("password", String.valueOf(secret));
clients = null;
}
@@ -145,20 +142,20 @@ public class DmaapProducerImpl implements Producer {
public boolean post(String partition, String data) {
boolean success = true;
Properties properties = configuration.getProperties();
- if (properties != null && properties.getProperty("metric.enabled") != null ) {
+ if (properties != null && properties.getProperty("metric.enabled") != null) {
isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled"));
}
if (isMetricEnabled) {
initMetric();
}
-
- // Create clients once and reuse them on subsequent posts. This is
+
+ // Create clients once and reuse them on subsequent posts. This is
// to support failover to other servers in the Dmaap cluster.
if ((clients == null) || (clients.isEmpty())) {
LOG.info("Getting CambriaBatchingPublisher Clients ...");
clients = getClients();
}
-
+
for (MRBatchingPublisher client : clients) {
try {
LOG.debug(String.format("Posting %s to %s", data, client));
@@ -190,7 +187,7 @@ public class DmaapProducerImpl implements Producer {
for (MRBatchingPublisher client : clients) {
try {
client.close(1, TimeUnit.SECONDS);
- } catch (IOException | InterruptedException e) {
+ } catch (IOException | InterruptedException e) {
LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client), e);
}
}
@@ -214,4 +211,11 @@ public class DmaapProducerImpl implements Producer {
}
}
+ public Properties getProperties() {
+ return props;
+ }
+
+ public boolean isHttps() {
+ return useHttps;
+ }
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapUtil.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapUtil.java
index 7a65311df..72e0a265b 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapUtil.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapUtil.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP : APPC
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Copyright (C) 2017 Amdocs
* =============================================================================
@@ -18,72 +18,138 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
* ============LICENSE_END=========================================================
*/
package org.onap.appc.adapter.messaging.dmaap.impl;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
+import org.onap.appc.configuration.Configuration;
+import org.onap.appc.configuration.ConfigurationFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class DmaapUtil {
- private static final char DELIMITER = '_';
+ private static final char DELIMITER = '_';
+
+ static final String DMAAP_PROPERTIES_PATH = "org.onap.appc.dmaap.profile.path";
+
+ private static final Logger log = LoggerFactory.getLogger(DmaapUtil.class);
private DmaapUtil() {
}
private static String createPreferredRouteFileIfNotExist(String topic) throws IOException {
String topicPreferredRouteFileName;
- topicPreferredRouteFileName = topic+"preferredRoute.properties";
- File fo= new File(topicPreferredRouteFileName);
- if(!fo.exists()) {
+ topicPreferredRouteFileName = topic + "preferredRoute.properties";
+ File fo = new File(topicPreferredRouteFileName);
+ if (!fo.exists()) {
ClassLoader classLoader = DmaapUtil.class.getClassLoader();
InputStream inputStream = classLoader.getResourceAsStream("preferredRoute.txt");
Properties props = new Properties();
props.load(inputStream);
- String fileName = topic != null ? topic+ DELIMITER +"MR1" : DELIMITER +"MR1";
+ String fileName = topic != null ? topic + DELIMITER + "MR1" : DELIMITER + "MR1";
props.setProperty("preferredRouteKey", fileName);
topicPreferredRouteFileName = topic + "preferredRoute.properties";
- props.store(new FileOutputStream(topicPreferredRouteFileName), "preferredRoute.properties file created on the fly for topic:" + topic + " on:" + System.currentTimeMillis());
+ props.store(new FileOutputStream(topicPreferredRouteFileName),
+ "preferredRoute.properties file created on the fly for topic:" + topic + " on:"
+ + System.currentTimeMillis());
}
return topicPreferredRouteFileName;
}
- public static String createConsumerPropFile(String topic, Properties props)throws IOException {
+ public static String createConsumerPropFile(String topic, Properties props) throws IOException {
String defaultProfFileName = "consumer.properties";
- return createConsumerProducerPropFile(topic, defaultProfFileName,props);
+
+ log.debug("Creating DMaaP Consumer Property File for topic " + topic);
+ return createConsumerProducerPropFile(topic, defaultProfFileName, props);
}
- public static String createProducerPropFile(String topic, Properties props)throws IOException {
+ public static String createProducerPropFile(String topic, Properties props) throws IOException {
String defaultProfFileName = "producer.properties";
- return createConsumerProducerPropFile(topic, defaultProfFileName,props);
+
+ log.debug("Creating DMaaP Producer Property File for topic " + topic);
+ return createConsumerProducerPropFile(topic, defaultProfFileName, props);
}
- private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props) throws IOException {
- ClassLoader classLoader = DmaapUtil.class.getClassLoader();
- InputStream inputStream = classLoader.getResourceAsStream(defaultProfFileName);
- Properties defaultProps = new Properties();
- defaultProps.load(inputStream);
- defaultProps.setProperty("topic",topic);
+ private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props)
+ throws IOException {
+ Properties defaultProps = getDefaultProperties(defaultProfFileName);
+
+ defaultProps.setProperty("topic", topic);
String preferredRouteFileName = DmaapUtil.createPreferredRouteFileIfNotExist(topic);
- if(props != null && !props.isEmpty()){
+ if (props != null && !props.isEmpty()) {
defaultProps.putAll(props);
}
- defaultProps.setProperty("topic",topic);
- defaultProps.setProperty("DME2preferredRouterFilePath",preferredRouteFileName);
+ defaultProps.setProperty("topic", topic);
+ defaultProps.setProperty("DME2preferredRouterFilePath", preferredRouteFileName);
String id = defaultProps.getProperty("id");
String topicConsumerPropFileName = defaultProfFileName;
- topicConsumerPropFileName = id != null ? id+ DELIMITER +topicConsumerPropFileName : DELIMITER +topicConsumerPropFileName;
- topicConsumerPropFileName = topic != null ? topic+ DELIMITER +topicConsumerPropFileName : DELIMITER +topicConsumerPropFileName;
+ topicConsumerPropFileName = id != null ? id + DELIMITER + topicConsumerPropFileName
+ : DELIMITER + topicConsumerPropFileName;
+ topicConsumerPropFileName = topic != null ? topic + DELIMITER + topicConsumerPropFileName
+ : DELIMITER + topicConsumerPropFileName;
- defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName+" file created on the fly for topic:"+topic+" on:"+System.currentTimeMillis());
+ defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName
+ + " file created on the fly for topic:" + topic + " on:" + System.currentTimeMillis());
return topicConsumerPropFileName;
}
+ private static Properties getDefaultProperties(String profileName) {
+ Properties props = new Properties();
+
+ // use appc configuration to get all properties which includes
+ // appc.properties and system properties
+ // allowing variable to be set in any location
+ Configuration config = ConfigurationFactory.getConfiguration();
+ String dmaapPropPath = config.getProperty(DMAAP_PROPERTIES_PATH);
+
+ if (dmaapPropPath != null) {
+ // load from file system
+
+ File profileFile = new File(dmaapPropPath, profileName);
+ FileInputStream inputStream = null;
+
+ log.info("Loading DMaaP Profile from " + profileFile.getAbsolutePath());
+
+ if (profileFile.exists()) {
+ try {
+ inputStream = new FileInputStream(profileFile);
+ props.load(inputStream);
+ } catch (IOException e) {
+ log.error("Exception loading DMaaP Profile from " + profileFile.getAbsolutePath(), e);
+ } finally {
+ try {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ } catch (IOException ex) {
+ log.warn("Exception closing DMaaP Profile file " + profileFile.getAbsolutePath(), ex);
+ }
+ }
+ }
+ }
+ if (props.isEmpty()) {
+ // load default Profile from class
+ log.info("Loading Default DMaaP Profile");
+
+ ClassLoader classLoader = DmaapUtil.class.getClassLoader();
+ InputStream inputStream = classLoader.getResourceAsStream(profileName);
+ try {
+ props.load(inputStream);
+ } catch (IOException e) {
+ log.error("Exception loading Default DMaaP Profile", e);
+ }
+ }
+
+ return props;
+ }
}