diff options
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter')
2 files changed, 141 insertions, 137 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java index 155f34bfc..6f907ae20 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java @@ -24,27 +24,24 @@ package org.onap.appc.adapter.messaging.dmaap.impl; -import java.io.IOException; -import java.util.*; - import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -//import com.att.nsa.cambria.client.CambriaClientBuilders; -//import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -//import com.att.nsa.cambria.client.CambriaConsumer; - import com.att.nsa.mr.client.MRClientFactory; import com.att.nsa.mr.client.MRConsumer; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; import org.apache.commons.lang3.StringUtils; import org.onap.appc.adapter.message.Consumer; import org.onap.appc.configuration.Configuration; import org.onap.appc.configuration.ConfigurationFactory; import org.onap.appc.metricservice.MetricRegistry; import org.onap.appc.metricservice.MetricService; -import org.onap.appc.metricservice.impl.MetricServiceImpl; +import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric; import org.onap.appc.metricservice.metric.Metric; import org.onap.appc.metricservice.metric.MetricType; -import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric; import org.onap.appc.metricservice.policy.PublishingPolicy; import org.onap.appc.metricservice.publisher.LogPublisher; import org.osgi.framework.BundleContext; @@ -53,74 +50,81 @@ import org.osgi.framework.ServiceReference; public class DmaapConsumerImpl implements Consumer { - private final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class); private final Configuration configuration = ConfigurationFactory.getConfiguration(); // Default values private static final int DEFAULT_TIMEOUT_MS = 60000; private static final int DEFAULT_LIMIT = 1000; - private static MetricRegistry metricRegistry; private String topic; - private boolean isMetricEnabled=false; + private boolean isMetricEnabled = false; private boolean useHttps = false; + private MetricRegistry metricRegistry; private MRConsumer client = null; private Properties props = null; - public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password) { - this(urls, topicName, consumerGroupName, consumerId,user, password,null); + public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId, + String user, String password) { + this(urls, topicName, consumerGroupName, consumerId,user, password,null); } - public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password,String filter) { + public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId, + String user, String password, String filter) { + this.topic = topicName; this.props = new Properties(); String urlsStr = StringUtils.join(urls, ','); - props.setProperty("host",urlsStr); - props.setProperty("group",consumerGroupName); - props.setProperty("id",consumerId); - props.setProperty("username",user); - props.setProperty("password",password); - if(filter != null) { + props.setProperty("host", urlsStr); + props.setProperty("group", consumerGroupName); + props.setProperty("id", consumerId); + props.setProperty("username", user); + props.setProperty("password", password); + if (filter != null) { props.setProperty("filter", filter); } } - private void initMetric() { LOG.debug("Metric getting initialized"); MetricService metricService = getMetricservice(); - metricRegistry = metricService.createRegistry("APPC"); - DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory(). - dmaapRequestCounterBuilder(). - withName("DMAAP_KPI").withType(MetricType.COUNTER). - withRecievedMessage(0) + if (metricService != null) { + metricRegistry = metricService.createRegistry("APPC"); + + DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory() + .dmaapRequestCounterBuilder() + .withName("DMAAP_KPI").withType(MetricType.COUNTER) + .withRecievedMessage(0) .withPublishedMessage(0) .build(); - if (metricRegistry.register(dmaapKpiMetric)) { - Metric[] metrics = new Metric[]{dmaapKpiMetric}; - LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); - LogPublisher[] logPublishers = new LogPublisher[1]; - logPublishers[0] = logPublisher; - PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). - scheduledPolicyBuilder().withPublishers(logPublishers). - withMetrics(metrics). - build(); - LOG.debug("Policy getting initialized"); - manuallyScheduledPublishingPolicy.init(); - LOG.debug("Metric initialized"); + + if (metricRegistry.register(dmaapKpiMetric)) { + Metric[] metrics = new Metric[]{dmaapKpiMetric}; + LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); + LogPublisher[] logPublishers = new LogPublisher[1]; + logPublishers[0] = logPublisher; + + PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory() + .scheduledPolicyBuilder().withPublishers(logPublishers) + .withMetrics(metrics) + .build(); + + LOG.debug("Policy getting initialized"); + manuallyScheduledPublishingPolicy.init(); + LOG.debug("Metric initialized"); + } } } - /** - * @return An instance of MRConsumer created from our class variables + * @return An instance of MRConsumer created from our class variables. */ private synchronized MRConsumer getClient(int waitMs, int limit) { try { props.setProperty("timeout",String.valueOf(waitMs)); props.setProperty("limit",String.valueOf(limit)); String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props); - return MRClientFactory.createConsumer ( topicProducerPropFileName); + return MRClientFactory.createConsumer(topicProducerPropFileName); } catch (IOException e1) { LOG.error("failed to createConsumer",e1); return null; @@ -136,12 +140,17 @@ public class DmaapConsumerImpl implements Consumer { } @Override + public List<String> fetch() { + return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); + } + + @Override public List<String> fetch(int waitMs, int limit) { - Properties properties=configuration.getProperties(); - if(properties!=null && properties.getProperty("metric.enabled")!=null ){ - isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); + Properties properties = configuration.getProperties(); + if (properties != null && properties.getProperty("metric.enabled") != null) { + isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled")); } - if(isMetricEnabled){ + if (isMetricEnabled) { initMetric(); } LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); @@ -153,32 +162,36 @@ public class DmaapConsumerImpl implements Consumer { LOG.info("Getting DMaaP Client ..."); client = getClient(waitMs, limit); } - try { - for (String s : client.fetch(waitMs, limit)) { - out.add(s); - if(isMetricEnabled){ - ((DmaapRequestCounterMetric)metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage(); - } - } - LOG.debug(String.format("Got %d records from %s", out.size(), this.toString())); - } catch (Exception e) { - // Connection exception - LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage())); - e.printStackTrace(); + if (client != null) { try { - LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs)); - Thread.sleep(waitMs); - } catch (InterruptedException e2) { - LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs)); + for (String s : client.fetch(waitMs, limit)) { + out.add(s); + incrementReceivedMessage(); + } + LOG.debug(String.format("Got %d records from %s", out.size(), this.toString())); + } catch (Exception e) { + // Connection exception + LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage()), e); + try { + LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs)); + Thread.sleep(waitMs); + } catch (InterruptedException e2) { + LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs)); + Thread.currentThread().interrupt(); + } } } - - return out; } + private void incrementReceivedMessage() { + if (isMetricEnabled && metricRegistry != null) { + ((DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage(); + } + } + /** - * Close consumer Dmaap client + * Close consumer Dmaap client. */ @Override public void close() { @@ -189,15 +202,10 @@ public class DmaapConsumerImpl implements Consumer { } @Override - public List<String> fetch() { - return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); - } - - @Override public String toString() { - String hostStr = (props == null || props.getProperty("host") == null? "N/A" : props.getProperty("host")); - String group = (props == null || props.getProperty("group") == null? "N/A" : props.getProperty("group")); - String id = (props == null || props.getProperty("id") == null? "N/A" : props.getProperty("id")); + String hostStr = (props == null || props.getProperty("host") == null ? "N/A" : props.getProperty("host")); + String group = (props == null || props.getProperty("group") == null ? "N/A" : props.getProperty("group")); + String id = (props == null || props.getProperty("id") == null ? "N/A" : props.getProperty("id")); return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr); } @@ -206,22 +214,17 @@ public class DmaapConsumerImpl implements Consumer { useHttps = yes; } - private MetricService getMetricservice() { BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); - // Get AAIadapter reference ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); if (sref != null) { LOG.info("Metric Service from bundlecontext"); - return (MetricServiceImpl) bctx.getService(sref); - + return (MetricService) bctx.getService(sref); } else { LOG.info("Metric Service error from bundlecontext"); LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService"); return null; - } } - } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java index 5f91ca33b..3fbfc95c6 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java @@ -24,46 +24,44 @@ package org.onap.appc.adapter.messaging.dmaap.impl; - import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.mr.client.MRBatchingPublisher; import com.att.nsa.mr.client.MRClientFactory; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.onap.appc.adapter.message.Producer; -import org.onap.appc.adapter.messaging.dmaap.impl.DmaapUtil; import org.onap.appc.configuration.Configuration; import org.onap.appc.configuration.ConfigurationFactory; import org.onap.appc.metricservice.MetricRegistry; import org.onap.appc.metricservice.MetricService; +import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric; import org.onap.appc.metricservice.metric.Metric; import org.onap.appc.metricservice.metric.MetricType; -import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric; import org.onap.appc.metricservice.policy.PublishingPolicy; import org.onap.appc.metricservice.publisher.LogPublisher; import org.osgi.framework.BundleContext; import org.osgi.framework.FrameworkUtil; import org.osgi.framework.ServiceReference; -import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - public class DmaapProducerImpl implements Producer { private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class); private static final Configuration configuration = ConfigurationFactory.getConfiguration(); - private Set<String> topics = new HashSet<String>(); + private Set<String> topics; private Properties props = null; - private static MetricRegistry metricRegistry; + private MetricRegistry metricRegistry; private boolean useHttps = false; - private boolean isMetricEnabled=false; + private boolean isMetricEnabled = false; private Set<MRBatchingPublisher> clients; @@ -72,16 +70,14 @@ public class DmaapProducerImpl implements Producer { this(urls, (Set<String>)null, user, password); this.topics = new HashSet<>(); if (topicName != null) { - for (String topic : topicName.split(",")) { - topics.add(topic); - } + Collections.addAll(topics, topicName.split(",")); } } public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) { topics = topicNames; - if(urls == null || user == null || password == null){ - throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password" ); + if (urls == null || user == null || password == null) { + throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password"); } this.props = new Properties(); String urlsStr = StringUtils.join(urls, ','); @@ -90,62 +86,69 @@ public class DmaapProducerImpl implements Producer { props.setProperty("username",user); props.setProperty("password",password); } + private void initMetric() { LOG.debug("Metric getting initialized"); MetricService metricService = getMetricservice(); - metricRegistry=metricService.createRegistry("APPC"); - DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory(). - dmaapRequestCounterBuilder(). - withName("DMAAP_KPI").withType(MetricType.COUNTER). - withRecievedMessage(0) + if (metricService != null) { + metricRegistry = metricService.createRegistry("APPC"); + + DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory() + .dmaapRequestCounterBuilder() + .withName("DMAAP_KPI").withType(MetricType.COUNTER) + .withRecievedMessage(0) .withPublishedMessage(0) .build(); - if(metricRegistry.register(dmaapKpiMetric)) { - Metric[] metrics = new Metric[]{dmaapKpiMetric}; - LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); - LogPublisher[] logPublishers = new LogPublisher[1]; - logPublishers[0] = logPublisher; - PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). - scheduledPolicyBuilder().withPublishers(logPublishers). - withMetrics(metrics). - build(); - LOG.debug("Policy getting initialized"); - manuallyScheduledPublishingPolicy.init(); - LOG.debug("Metric initialized"); - } + if (metricRegistry.register(dmaapKpiMetric)) { + Metric[] metrics = new Metric[]{dmaapKpiMetric}; + LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); + LogPublisher[] logPublishers = new LogPublisher[1]; + logPublishers[0] = logPublisher; + + PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory() + .scheduledPolicyBuilder() + .withPublishers(logPublishers) + .withMetrics(metrics) + .build(); + + LOG.debug("Policy getting initialized"); + manuallyScheduledPublishingPolicy.init(); + LOG.debug("Metric initialized"); + } + } } + private Set<MRBatchingPublisher> getClients() { Set<MRBatchingPublisher> out = new HashSet<>(); for (String topic : topics) { try { String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props); - final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher (topicProducerPropFileName); + final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher(topicProducerPropFileName); out.add(client); } catch (Exception e) { - LOG.error(e.getMessage()); + LOG.error(e.getMessage(), e); } } - return out; } @Override public synchronized void updateCredentials(String key, String secret) { LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); - props.setProperty("user",String.valueOf(key)); - props.setProperty("password",String.valueOf(secret)); + props.setProperty("user", String.valueOf(key)); + props.setProperty("password", String.valueOf(secret)); clients = null; } @Override public boolean post(String partition, String data) { boolean success = true; - Properties properties=configuration.getProperties(); - if(properties!=null && properties.getProperty("metric.enabled")!=null ){ - isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); + Properties properties = configuration.getProperties(); + if (properties != null && properties.getProperty("metric.enabled") != null ) { + isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled")); } - if(isMetricEnabled){ + if (isMetricEnabled) { initMetric(); } @@ -161,31 +164,34 @@ public class DmaapProducerImpl implements Producer { LOG.debug(String.format("Posting %s to %s", data, client)); client.send(partition, data); } catch (IOException e) { - LOG.error(e.getMessage()); + LOG.error(e.getMessage(), e); success = false; } } - if(isMetricEnabled){ - ( (DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage(); - } + incrementPublishedMessage(); return success; } + private void incrementPublishedMessage() { + if (isMetricEnabled && metricRegistry != null) { + ((DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage(); + } + } + /** - * Close producer Dmaap client + * Close producer Dmaap client. */ @Override public void close() { if ((clients == null) || (clients.isEmpty())) { return; } - LOG.debug("Closing Dmaap producer clients...."); for (MRBatchingPublisher client : clients) { try { client.close(1, TimeUnit.SECONDS); } catch (IOException | InterruptedException e) { - LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client)); + LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client), e); } } } @@ -201,16 +207,11 @@ public class DmaapProducerImpl implements Producer { if (sref != null) { LOG.info("Metric Service from bundlecontext"); return (MetricService) bctx.getService(sref); - } else { LOG.info("Metric Service error from bundlecontext"); LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService"); return null; - } } - public Metric getMetric(String name){ - return metricRegistry.metric(name); - } } |