summaryrefslogtreecommitdiffstats
path: root/appc-adapters/appc-dmaap-adapter
diff options
context:
space:
mode:
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter')
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java161
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java117
2 files changed, 141 insertions, 137 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
index 155f34bfc..6f907ae20 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
@@ -24,27 +24,24 @@
package org.onap.appc.adapter.messaging.dmaap.impl;
-import java.io.IOException;
-import java.util.*;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-//import com.att.nsa.cambria.client.CambriaClientBuilders;
-//import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-//import com.att.nsa.cambria.client.CambriaConsumer;
-
import com.att.nsa.mr.client.MRClientFactory;
import com.att.nsa.mr.client.MRConsumer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.onap.appc.adapter.message.Consumer;
import org.onap.appc.configuration.Configuration;
import org.onap.appc.configuration.ConfigurationFactory;
import org.onap.appc.metricservice.MetricRegistry;
import org.onap.appc.metricservice.MetricService;
-import org.onap.appc.metricservice.impl.MetricServiceImpl;
+import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
import org.onap.appc.metricservice.metric.Metric;
import org.onap.appc.metricservice.metric.MetricType;
-import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
import org.onap.appc.metricservice.policy.PublishingPolicy;
import org.onap.appc.metricservice.publisher.LogPublisher;
import org.osgi.framework.BundleContext;
@@ -53,74 +50,81 @@ import org.osgi.framework.ServiceReference;
public class DmaapConsumerImpl implements Consumer {
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
private final Configuration configuration = ConfigurationFactory.getConfiguration();
// Default values
private static final int DEFAULT_TIMEOUT_MS = 60000;
private static final int DEFAULT_LIMIT = 1000;
- private static MetricRegistry metricRegistry;
private String topic;
- private boolean isMetricEnabled=false;
+ private boolean isMetricEnabled = false;
private boolean useHttps = false;
+ private MetricRegistry metricRegistry;
private MRConsumer client = null;
private Properties props = null;
- public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password) {
- this(urls, topicName, consumerGroupName, consumerId,user, password,null);
+ public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
+ String user, String password) {
+ this(urls, topicName, consumerGroupName, consumerId,user, password,null);
}
- public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password,String filter) {
+ public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
+ String user, String password, String filter) {
+
this.topic = topicName;
this.props = new Properties();
String urlsStr = StringUtils.join(urls, ',');
- props.setProperty("host",urlsStr);
- props.setProperty("group",consumerGroupName);
- props.setProperty("id",consumerId);
- props.setProperty("username",user);
- props.setProperty("password",password);
- if(filter != null) {
+ props.setProperty("host", urlsStr);
+ props.setProperty("group", consumerGroupName);
+ props.setProperty("id", consumerId);
+ props.setProperty("username", user);
+ props.setProperty("password", password);
+ if (filter != null) {
props.setProperty("filter", filter);
}
}
-
private void initMetric() {
LOG.debug("Metric getting initialized");
MetricService metricService = getMetricservice();
- metricRegistry = metricService.createRegistry("APPC");
- DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory().
- dmaapRequestCounterBuilder().
- withName("DMAAP_KPI").withType(MetricType.COUNTER).
- withRecievedMessage(0)
+ if (metricService != null) {
+ metricRegistry = metricService.createRegistry("APPC");
+
+ DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory()
+ .dmaapRequestCounterBuilder()
+ .withName("DMAAP_KPI").withType(MetricType.COUNTER)
+ .withRecievedMessage(0)
.withPublishedMessage(0)
.build();
- if (metricRegistry.register(dmaapKpiMetric)) {
- Metric[] metrics = new Metric[]{dmaapKpiMetric};
- LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
- LogPublisher[] logPublishers = new LogPublisher[1];
- logPublishers[0] = logPublisher;
- PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory().
- scheduledPolicyBuilder().withPublishers(logPublishers).
- withMetrics(metrics).
- build();
- LOG.debug("Policy getting initialized");
- manuallyScheduledPublishingPolicy.init();
- LOG.debug("Metric initialized");
+
+ if (metricRegistry.register(dmaapKpiMetric)) {
+ Metric[] metrics = new Metric[]{dmaapKpiMetric};
+ LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
+ LogPublisher[] logPublishers = new LogPublisher[1];
+ logPublishers[0] = logPublisher;
+
+ PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory()
+ .scheduledPolicyBuilder().withPublishers(logPublishers)
+ .withMetrics(metrics)
+ .build();
+
+ LOG.debug("Policy getting initialized");
+ manuallyScheduledPublishingPolicy.init();
+ LOG.debug("Metric initialized");
+ }
}
}
-
/**
- * @return An instance of MRConsumer created from our class variables
+ * @return An instance of MRConsumer created from our class variables.
*/
private synchronized MRConsumer getClient(int waitMs, int limit) {
try {
props.setProperty("timeout",String.valueOf(waitMs));
props.setProperty("limit",String.valueOf(limit));
String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props);
- return MRClientFactory.createConsumer ( topicProducerPropFileName);
+ return MRClientFactory.createConsumer(topicProducerPropFileName);
} catch (IOException e1) {
LOG.error("failed to createConsumer",e1);
return null;
@@ -136,12 +140,17 @@ public class DmaapConsumerImpl implements Consumer {
}
@Override
+ public List<String> fetch() {
+ return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
+ }
+
+ @Override
public List<String> fetch(int waitMs, int limit) {
- Properties properties=configuration.getProperties();
- if(properties!=null && properties.getProperty("metric.enabled")!=null ){
- isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled"));
+ Properties properties = configuration.getProperties();
+ if (properties != null && properties.getProperty("metric.enabled") != null) {
+ isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled"));
}
- if(isMetricEnabled){
+ if (isMetricEnabled) {
initMetric();
}
LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
@@ -153,32 +162,36 @@ public class DmaapConsumerImpl implements Consumer {
LOG.info("Getting DMaaP Client ...");
client = getClient(waitMs, limit);
}
- try {
- for (String s : client.fetch(waitMs, limit)) {
- out.add(s);
- if(isMetricEnabled){
- ((DmaapRequestCounterMetric)metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage();
- }
- }
- LOG.debug(String.format("Got %d records from %s", out.size(), this.toString()));
- } catch (Exception e) {
- // Connection exception
- LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage()));
- e.printStackTrace();
+ if (client != null) {
try {
- LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs));
- Thread.sleep(waitMs);
- } catch (InterruptedException e2) {
- LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs));
+ for (String s : client.fetch(waitMs, limit)) {
+ out.add(s);
+ incrementReceivedMessage();
+ }
+ LOG.debug(String.format("Got %d records from %s", out.size(), this.toString()));
+ } catch (Exception e) {
+ // Connection exception
+ LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage()), e);
+ try {
+ LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs));
+ Thread.sleep(waitMs);
+ } catch (InterruptedException e2) {
+ LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs));
+ Thread.currentThread().interrupt();
+ }
}
}
-
-
return out;
}
+ private void incrementReceivedMessage() {
+ if (isMetricEnabled && metricRegistry != null) {
+ ((DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage();
+ }
+ }
+
/**
- * Close consumer Dmaap client
+ * Close consumer Dmaap client.
*/
@Override
public void close() {
@@ -189,15 +202,10 @@ public class DmaapConsumerImpl implements Consumer {
}
@Override
- public List<String> fetch() {
- return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
- }
-
- @Override
public String toString() {
- String hostStr = (props == null || props.getProperty("host") == null? "N/A" : props.getProperty("host"));
- String group = (props == null || props.getProperty("group") == null? "N/A" : props.getProperty("group"));
- String id = (props == null || props.getProperty("id") == null? "N/A" : props.getProperty("id"));
+ String hostStr = (props == null || props.getProperty("host") == null ? "N/A" : props.getProperty("host"));
+ String group = (props == null || props.getProperty("group") == null ? "N/A" : props.getProperty("group"));
+ String id = (props == null || props.getProperty("id") == null ? "N/A" : props.getProperty("id"));
return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr);
}
@@ -206,22 +214,17 @@ public class DmaapConsumerImpl implements Consumer {
useHttps = yes;
}
-
private MetricService getMetricservice() {
BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext();
- // Get AAIadapter reference
ServiceReference sref = bctx.getServiceReference(MetricService.class.getName());
if (sref != null) {
LOG.info("Metric Service from bundlecontext");
- return (MetricServiceImpl) bctx.getService(sref);
-
+ return (MetricService) bctx.getService(sref);
} else {
LOG.info("Metric Service error from bundlecontext");
LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService");
return null;
-
}
}
-
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
index 5f91ca33b..3fbfc95c6 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
@@ -24,46 +24,44 @@
package org.onap.appc.adapter.messaging.dmaap.impl;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.mr.client.MRBatchingPublisher;
import com.att.nsa.mr.client.MRClientFactory;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.onap.appc.adapter.message.Producer;
-import org.onap.appc.adapter.messaging.dmaap.impl.DmaapUtil;
import org.onap.appc.configuration.Configuration;
import org.onap.appc.configuration.ConfigurationFactory;
import org.onap.appc.metricservice.MetricRegistry;
import org.onap.appc.metricservice.MetricService;
+import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
import org.onap.appc.metricservice.metric.Metric;
import org.onap.appc.metricservice.metric.MetricType;
-import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
import org.onap.appc.metricservice.policy.PublishingPolicy;
import org.onap.appc.metricservice.publisher.LogPublisher;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
public class DmaapProducerImpl implements Producer {
private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class);
private static final Configuration configuration = ConfigurationFactory.getConfiguration();
- private Set<String> topics = new HashSet<String>();
+ private Set<String> topics;
private Properties props = null;
- private static MetricRegistry metricRegistry;
+ private MetricRegistry metricRegistry;
private boolean useHttps = false;
- private boolean isMetricEnabled=false;
+ private boolean isMetricEnabled = false;
private Set<MRBatchingPublisher> clients;
@@ -72,16 +70,14 @@ public class DmaapProducerImpl implements Producer {
this(urls, (Set<String>)null, user, password);
this.topics = new HashSet<>();
if (topicName != null) {
- for (String topic : topicName.split(",")) {
- topics.add(topic);
- }
+ Collections.addAll(topics, topicName.split(","));
}
}
public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) {
topics = topicNames;
- if(urls == null || user == null || password == null){
- throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password" );
+ if (urls == null || user == null || password == null) {
+ throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password");
}
this.props = new Properties();
String urlsStr = StringUtils.join(urls, ',');
@@ -90,62 +86,69 @@ public class DmaapProducerImpl implements Producer {
props.setProperty("username",user);
props.setProperty("password",password);
}
+
private void initMetric() {
LOG.debug("Metric getting initialized");
MetricService metricService = getMetricservice();
- metricRegistry=metricService.createRegistry("APPC");
- DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory().
- dmaapRequestCounterBuilder().
- withName("DMAAP_KPI").withType(MetricType.COUNTER).
- withRecievedMessage(0)
+ if (metricService != null) {
+ metricRegistry = metricService.createRegistry("APPC");
+
+ DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory()
+ .dmaapRequestCounterBuilder()
+ .withName("DMAAP_KPI").withType(MetricType.COUNTER)
+ .withRecievedMessage(0)
.withPublishedMessage(0)
.build();
- if(metricRegistry.register(dmaapKpiMetric)) {
- Metric[] metrics = new Metric[]{dmaapKpiMetric};
- LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
- LogPublisher[] logPublishers = new LogPublisher[1];
- logPublishers[0] = logPublisher;
- PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory().
- scheduledPolicyBuilder().withPublishers(logPublishers).
- withMetrics(metrics).
- build();
- LOG.debug("Policy getting initialized");
- manuallyScheduledPublishingPolicy.init();
- LOG.debug("Metric initialized");
- }
+ if (metricRegistry.register(dmaapKpiMetric)) {
+ Metric[] metrics = new Metric[]{dmaapKpiMetric};
+ LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
+ LogPublisher[] logPublishers = new LogPublisher[1];
+ logPublishers[0] = logPublisher;
+
+ PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory()
+ .scheduledPolicyBuilder()
+ .withPublishers(logPublishers)
+ .withMetrics(metrics)
+ .build();
+
+ LOG.debug("Policy getting initialized");
+ manuallyScheduledPublishingPolicy.init();
+ LOG.debug("Metric initialized");
+ }
+ }
}
+
private Set<MRBatchingPublisher> getClients() {
Set<MRBatchingPublisher> out = new HashSet<>();
for (String topic : topics) {
try {
String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props);
- final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher (topicProducerPropFileName);
+ final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher(topicProducerPropFileName);
out.add(client);
} catch (Exception e) {
- LOG.error(e.getMessage());
+ LOG.error(e.getMessage(), e);
}
}
-
return out;
}
@Override
public synchronized void updateCredentials(String key, String secret) {
LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
- props.setProperty("user",String.valueOf(key));
- props.setProperty("password",String.valueOf(secret));
+ props.setProperty("user", String.valueOf(key));
+ props.setProperty("password", String.valueOf(secret));
clients = null;
}
@Override
public boolean post(String partition, String data) {
boolean success = true;
- Properties properties=configuration.getProperties();
- if(properties!=null && properties.getProperty("metric.enabled")!=null ){
- isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled"));
+ Properties properties = configuration.getProperties();
+ if (properties != null && properties.getProperty("metric.enabled") != null ) {
+ isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled"));
}
- if(isMetricEnabled){
+ if (isMetricEnabled) {
initMetric();
}
@@ -161,31 +164,34 @@ public class DmaapProducerImpl implements Producer {
LOG.debug(String.format("Posting %s to %s", data, client));
client.send(partition, data);
} catch (IOException e) {
- LOG.error(e.getMessage());
+ LOG.error(e.getMessage(), e);
success = false;
}
}
- if(isMetricEnabled){
- ( (DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage();
- }
+ incrementPublishedMessage();
return success;
}
+ private void incrementPublishedMessage() {
+ if (isMetricEnabled && metricRegistry != null) {
+ ((DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage();
+ }
+ }
+
/**
- * Close producer Dmaap client
+ * Close producer Dmaap client.
*/
@Override
public void close() {
if ((clients == null) || (clients.isEmpty())) {
return;
}
-
LOG.debug("Closing Dmaap producer clients....");
for (MRBatchingPublisher client : clients) {
try {
client.close(1, TimeUnit.SECONDS);
} catch (IOException | InterruptedException e) {
- LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client));
+ LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client), e);
}
}
}
@@ -201,16 +207,11 @@ public class DmaapProducerImpl implements Producer {
if (sref != null) {
LOG.info("Metric Service from bundlecontext");
return (MetricService) bctx.getService(sref);
-
} else {
LOG.info("Metric Service error from bundlecontext");
LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService");
return null;
-
}
}
- public Metric getMetric(String name){
- return metricRegistry.metric(name);
- }
}