summaryrefslogtreecommitdiffstats
path: root/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main')
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java3
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java16
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java20
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java22
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java35
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java46
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java15
7 files changed, 69 insertions, 88 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java
index 812f80121..a34169e06 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java
@@ -28,7 +28,7 @@ import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
+
/**
@@ -50,7 +50,6 @@ import org.osgi.framework.ServiceRegistration;
* </p>
*/
public class AppcDmaapAdapterActivator implements BundleActivator {
- private ServiceRegistration registration = null;
/**
* The logger to be used
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
index 76b050d8e..e67007f64 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
@@ -50,19 +50,23 @@ public class CommonHttpClient {
}
public HttpGet getReq(URI uri, int timeoutMs) throws Exception {
- HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri);
- if (AUTH_STR != null) {
- out.setHeader("Authorization", String.format("Basic %s", AUTH_STR));
+ if (AUTH_STR == null) {
+ throw new Exception("All DMaaP requests require authentication and none was provided.");
}
+
+ HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri);
+ out.setHeader("Authorization", String.format("Basic %s", AUTH_STR));
out.setConfig(getConfig(timeoutMs));
return out;
}
public HttpPost postReq(String url) throws Exception {
- HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url);
- if (AUTH_STR != null) {
- out.setHeader("Authorization", String.format("Basic %s", AUTH_STR));
+ if (AUTH_STR == null) {
+ throw new Exception("All DMaaP requests require authentication and none was provided.");
}
+
+ HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url);
+ out.setHeader("Authorization", String.format("Basic %s", AUTH_STR));
out.setConfig(getConfig(0));
return out;
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
index df81e9718..6be34553c 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
@@ -44,12 +44,11 @@ import org.onap.appc.adapter.message.Consumer;
public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer {
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
// Default values
private static final int DEFAULT_TIMEOUT_MS = 15000;
private static final int DEFAULT_LIMIT = 1000;
- private static final String HTTPS_PORT = ":3905";
private static final String URL_TEMPLATE = "%s/events/%s/%s/%s";
private List<String> urls;
@@ -57,9 +56,6 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
private boolean useHttps = false;
- public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId) {
- this(hosts, topicName, consumerName, consumerId, null);
- }
public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
String filter) {
@@ -85,9 +81,9 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
@Override
public List<String> fetch(int waitMs, int limit) {
LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
- List<String> out = new ArrayList<String>();
+ List<String> out = new ArrayList<>();
try {
- List<NameValuePair> urlParams = new ArrayList<NameValuePair>();
+ List<NameValuePair> urlParams = new ArrayList<>();
urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs)));
urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit)));
if (filter != null) {
@@ -141,7 +137,7 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
@Override
public String toString() {
- String hostStr = (urls == null || urls.isEmpty()) ? "N/A" : urls.get(0);
+ String hostStr = (urls == null && !urls.isEmpty()) ? "N/A" : urls.get(0);
return String.format("Consumer listening to [%s]", hostStr);
}
@@ -159,9 +155,9 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
}
}
- @Override
- public void close() {
- // Nothing to do
- }
+ @Override
+ public void close() {
+ // Nothing to do
+ }
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
index 560c09be4..74c0c26b2 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
@@ -41,7 +41,7 @@ import org.onap.appc.adapter.message.Producer;
public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer {
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class);
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class);
private static final String CONTENT_TYPE = "application/cambria";
private static final String URL_TEMPLATE = "%s/events/%s";
@@ -52,8 +52,8 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer
private boolean useHttps = false;
public HttpDmaapProducerImpl(Collection<String> urls, String topicName) {
- hosts = new ArrayList<String>();
- topics = new HashSet<String>();
+ hosts = new ArrayList<>();
+ topics = new HashSet<>();
topics.add(topicName);
for (String host : urls) {
@@ -61,14 +61,6 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer
}
}
- public HttpDmaapProducerImpl(Collection<String> urls, Set<String> topicNames) {
- hosts = new ArrayList<String>();
- topics = topicNames;
-
- for (String host : urls) {
- hosts.add(formatHostString(host));
- }
- }
@Override
public void updateCredentials(String user, String pass) {
@@ -131,8 +123,8 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer
return String.format("%d.%d.%s%s", p.length(), m.length(), p, m);
}
- @Override
- public void close() {
- // Nothing to do
- }
+ @Override
+ public void close() {
+ // Nothing to do
+ }
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
index ddf630545..155f34bfc 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
@@ -53,14 +53,13 @@ import org.osgi.framework.ServiceReference;
public class DmaapConsumerImpl implements Consumer {
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
- private static final Configuration configuration = ConfigurationFactory.getConfiguration();
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
+ private final Configuration configuration = ConfigurationFactory.getConfiguration();
// Default values
private static final int DEFAULT_TIMEOUT_MS = 60000;
private static final int DEFAULT_LIMIT = 1000;
private static MetricRegistry metricRegistry;
private String topic;
- private DmaapRequestCounterMetric dmaapKpiMetric;
private boolean isMetricEnabled=false;
private boolean useHttps = false;
private MRConsumer client = null;
@@ -91,7 +90,7 @@ public class DmaapConsumerImpl implements Consumer {
LOG.debug("Metric getting initialized");
MetricService metricService = getMetricservice();
metricRegistry = metricService.createRegistry("APPC");
- dmaapKpiMetric = metricRegistry.metricBuilderFactory().
+ DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory().
dmaapRequestCounterBuilder().
withName("DMAAP_KPI").withType(MetricType.COUNTER).
withRecievedMessage(0)
@@ -111,9 +110,7 @@ public class DmaapConsumerImpl implements Consumer {
LOG.debug("Metric initialized");
}
}
- private MRConsumer getClient() {
- return getClient(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
- }
+
/**
* @return An instance of MRConsumer created from our class variables
@@ -133,10 +130,8 @@ public class DmaapConsumerImpl implements Consumer {
@Override
public synchronized void updateCredentials(String key, String secret) {
LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
- String user = key;
- String password = secret;
- props.setProperty("user",String.valueOf(user));
- props.setProperty("password",String.valueOf(password));
+ props.setProperty("user",String.valueOf(key));
+ props.setProperty("password",String.valueOf(secret));
client = null;
}
@@ -150,13 +145,13 @@ public class DmaapConsumerImpl implements Consumer {
initMetric();
}
LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
- List<String> out = new ArrayList<String>();
+ List<String> out = new ArrayList<>();
// Create client once and reuse it on subsequent fetches. This is
// to support failover to other servers in the DMaaP cluster.
if (client == null) {
- LOG.info("Getting DMaaP Client ...");
- client = getClient(waitMs, limit);
+ LOG.info("Getting DMaaP Client ...");
+ client = getClient(waitMs, limit);
}
try {
for (String s : client.fetch(waitMs, limit)) {
@@ -187,10 +182,10 @@ public class DmaapConsumerImpl implements Consumer {
*/
@Override
public void close() {
- LOG.debug("Closing Dmaap consumer client....");
- if (client != null) {
- client.close();
- }
+ LOG.debug("Closing Dmaap consumer client....");
+ if (client != null) {
+ client.close();
+ }
}
@Override
@@ -228,7 +223,5 @@ public class DmaapConsumerImpl implements Consumer {
}
}
- public Metric getMetric(String name){
- return metricRegistry.metric(name);
- }
+
}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
index 7ed06a9e3..888b432b0 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
@@ -24,16 +24,9 @@
package org.onap.appc.adapter.messaging.dmaap.impl;
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-//import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-//import com.att.nsa.cambria.client.CambriaClientBuilders;
-//import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-
import com.att.nsa.mr.client.MRBatchingPublisher;
import com.att.nsa.mr.client.MRClientFactory;
import org.apache.commons.lang3.StringUtils;
@@ -51,6 +44,14 @@ import org.onap.appc.metricservice.publisher.LogPublisher;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
public class DmaapProducerImpl implements Producer {
@@ -62,7 +63,6 @@ public class DmaapProducerImpl implements Producer {
private Properties props = null;
private static MetricRegistry metricRegistry;
private boolean useHttps = false;
- private DmaapRequestCounterMetric dmaapKpiMetric;
private boolean isMetricEnabled=false;
private Set<MRBatchingPublisher> clients;
@@ -94,7 +94,7 @@ public class DmaapProducerImpl implements Producer {
LOG.debug("Metric getting initialized");
MetricService metricService = getMetricservice();
metricRegistry=metricService.createRegistry("APPC");
- dmaapKpiMetric = metricRegistry.metricBuilderFactory().
+ DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory().
dmaapRequestCounterBuilder().
withName("DMAAP_KPI").withType(MetricType.COUNTER).
withRecievedMessage(0)
@@ -116,7 +116,7 @@ public class DmaapProducerImpl implements Producer {
}
private Set<MRBatchingPublisher> getClients() {
- Set<MRBatchingPublisher> out = new HashSet<MRBatchingPublisher>();
+ Set<MRBatchingPublisher> out = new HashSet<>();
for (String topic : topics) {
try {
String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props);
@@ -133,10 +133,8 @@ public class DmaapProducerImpl implements Producer {
@Override
public synchronized void updateCredentials(String key, String secret) {
LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
- String user = key;
- String password = secret;
- props.setProperty("user",String.valueOf(user));
- props.setProperty("password",String.valueOf(password));
+ props.setProperty("user",String.valueOf(key));
+ props.setProperty("password",String.valueOf(secret));
clients = null;
}
@@ -154,8 +152,8 @@ public class DmaapProducerImpl implements Producer {
// Create clients once and reuse them on subsequent posts. This is
// to support failover to other servers in the Dmaap cluster.
if ((clients == null) || (clients.isEmpty())) {
- LOG.info("Getting CambriaBatchingPublisher Clients ...");
- clients = getClients();
+ LOG.info("Getting CambriaBatchingPublisher Clients ...");
+ clients = getClients();
}
for (MRBatchingPublisher client : clients) {
@@ -178,14 +176,14 @@ public class DmaapProducerImpl implements Producer {
*/
@Override
public void close() {
- if ((clients == null) || (clients.isEmpty())) {
- return;
- }
+ if ((clients == null) || (clients.isEmpty())) {
+ return;
+ }
- LOG.debug("Closing Dmaap producer clients....");
- for (MRBatchingPublisher client : clients) {
+ LOG.debug("Closing Dmaap producer clients....");
+ for (MRBatchingPublisher client : clients) {
try {
- client.close(1, TimeUnit.SECONDS);
+ client.close(1, TimeUnit.SECONDS);
} catch (IOException | InterruptedException e) {
LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client));
e.printStackTrace();
@@ -199,10 +197,6 @@ public class DmaapProducerImpl implements Producer {
}
private MetricService getMetricservice() {
-/*
- return AppcDmaapAdapterActivator.getMetricService();
-*/
-
BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext();
ServiceReference sref = bctx.getServiceReference(MetricService.class.getName());
if (sref != null) {
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java
index bf7649026..1ba474c6c 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java
@@ -27,24 +27,27 @@ package org.onap.appc.adapter.messaging.dmaap.impl;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.onap.appc.adapter.message.EventSender;
import org.onap.appc.adapter.message.MessageDestination;
import org.onap.appc.adapter.message.Producer;
import org.onap.appc.adapter.message.event.EventHeader;
import org.onap.appc.adapter.message.event.EventMessage;
import org.onap.appc.adapter.message.event.EventStatus;
-import org.onap.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl;
import org.onap.appc.configuration.Configuration;
import org.onap.appc.configuration.ConfigurationFactory;
import org.onap.appc.exceptions.APPCException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
public class EventSenderDmaapImpl implements EventSender
{
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class);
+ private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class);
public static final String EVENT_TOPIC_WRITE = "dmaap.event.topic.write";
public static final String DMAAP_USERNAME = "dmaap.appc.username";
public static final String DMAAP_PASSWORD = "dmaap.appc.password";