diff options
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter')
8 files changed, 80 insertions, 106 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java index 812f80121..a34169e06 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java @@ -28,7 +28,7 @@ import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceRegistration; + /** @@ -50,7 +50,6 @@ import org.osgi.framework.ServiceRegistration; * </p> */ public class AppcDmaapAdapterActivator implements BundleActivator { - private ServiceRegistration registration = null; /** * The logger to be used diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java index 76b050d8e..e67007f64 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java @@ -50,19 +50,23 @@ public class CommonHttpClient { } public HttpGet getReq(URI uri, int timeoutMs) throws Exception { - HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri); - if (AUTH_STR != null) { - out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); + if (AUTH_STR == null) { + throw new Exception("All DMaaP requests require authentication and none was provided."); } + + HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri); + out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); out.setConfig(getConfig(timeoutMs)); return out; } public HttpPost postReq(String url) throws Exception { - HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url); - if (AUTH_STR != null) { - out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); + if (AUTH_STR == null) { + throw new Exception("All DMaaP requests require authentication and none was provided."); } + + HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url); + out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); out.setConfig(getConfig(0)); return out; } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java index df81e9718..6be34553c 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java @@ -44,12 +44,11 @@ import org.onap.appc.adapter.message.Consumer; public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); + private final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); // Default values private static final int DEFAULT_TIMEOUT_MS = 15000; private static final int DEFAULT_LIMIT = 1000; - private static final String HTTPS_PORT = ":3905"; private static final String URL_TEMPLATE = "%s/events/%s/%s/%s"; private List<String> urls; @@ -57,9 +56,6 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer private boolean useHttps = false; - public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId) { - this(hosts, topicName, consumerName, consumerId, null); - } public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, String filter) { @@ -85,9 +81,9 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer @Override public List<String> fetch(int waitMs, int limit) { LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); - List<String> out = new ArrayList<String>(); + List<String> out = new ArrayList<>(); try { - List<NameValuePair> urlParams = new ArrayList<NameValuePair>(); + List<NameValuePair> urlParams = new ArrayList<>(); urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs))); urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit))); if (filter != null) { @@ -141,7 +137,7 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer @Override public String toString() { - String hostStr = (urls == null || urls.isEmpty()) ? "N/A" : urls.get(0); + String hostStr = (urls == null && !urls.isEmpty()) ? "N/A" : urls.get(0); return String.format("Consumer listening to [%s]", hostStr); } @@ -159,9 +155,9 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer } } - @Override - public void close() { - // Nothing to do - } + @Override + public void close() { + // Nothing to do + } } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java index 560c09be4..74c0c26b2 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java @@ -41,7 +41,7 @@ import org.onap.appc.adapter.message.Producer; public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class); + private final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class); private static final String CONTENT_TYPE = "application/cambria"; private static final String URL_TEMPLATE = "%s/events/%s"; @@ -52,8 +52,8 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer private boolean useHttps = false; public HttpDmaapProducerImpl(Collection<String> urls, String topicName) { - hosts = new ArrayList<String>(); - topics = new HashSet<String>(); + hosts = new ArrayList<>(); + topics = new HashSet<>(); topics.add(topicName); for (String host : urls) { @@ -61,14 +61,6 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer } } - public HttpDmaapProducerImpl(Collection<String> urls, Set<String> topicNames) { - hosts = new ArrayList<String>(); - topics = topicNames; - - for (String host : urls) { - hosts.add(formatHostString(host)); - } - } @Override public void updateCredentials(String user, String pass) { @@ -131,8 +123,8 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer return String.format("%d.%d.%s%s", p.length(), m.length(), p, m); } - @Override - public void close() { - // Nothing to do - } + @Override + public void close() { + // Nothing to do + } } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java index ddf630545..155f34bfc 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java @@ -53,14 +53,13 @@ import org.osgi.framework.ServiceReference; public class DmaapConsumerImpl implements Consumer { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class); - private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + private final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class); + private final Configuration configuration = ConfigurationFactory.getConfiguration(); // Default values private static final int DEFAULT_TIMEOUT_MS = 60000; private static final int DEFAULT_LIMIT = 1000; private static MetricRegistry metricRegistry; private String topic; - private DmaapRequestCounterMetric dmaapKpiMetric; private boolean isMetricEnabled=false; private boolean useHttps = false; private MRConsumer client = null; @@ -91,7 +90,7 @@ public class DmaapConsumerImpl implements Consumer { LOG.debug("Metric getting initialized"); MetricService metricService = getMetricservice(); metricRegistry = metricService.createRegistry("APPC"); - dmaapKpiMetric = metricRegistry.metricBuilderFactory(). + DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory(). dmaapRequestCounterBuilder(). withName("DMAAP_KPI").withType(MetricType.COUNTER). withRecievedMessage(0) @@ -111,9 +110,7 @@ public class DmaapConsumerImpl implements Consumer { LOG.debug("Metric initialized"); } } - private MRConsumer getClient() { - return getClient(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); - } + /** * @return An instance of MRConsumer created from our class variables @@ -133,10 +130,8 @@ public class DmaapConsumerImpl implements Consumer { @Override public synchronized void updateCredentials(String key, String secret) { LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); - String user = key; - String password = secret; - props.setProperty("user",String.valueOf(user)); - props.setProperty("password",String.valueOf(password)); + props.setProperty("user",String.valueOf(key)); + props.setProperty("password",String.valueOf(secret)); client = null; } @@ -150,13 +145,13 @@ public class DmaapConsumerImpl implements Consumer { initMetric(); } LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); - List<String> out = new ArrayList<String>(); + List<String> out = new ArrayList<>(); // Create client once and reuse it on subsequent fetches. This is // to support failover to other servers in the DMaaP cluster. if (client == null) { - LOG.info("Getting DMaaP Client ..."); - client = getClient(waitMs, limit); + LOG.info("Getting DMaaP Client ..."); + client = getClient(waitMs, limit); } try { for (String s : client.fetch(waitMs, limit)) { @@ -187,10 +182,10 @@ public class DmaapConsumerImpl implements Consumer { */ @Override public void close() { - LOG.debug("Closing Dmaap consumer client...."); - if (client != null) { - client.close(); - } + LOG.debug("Closing Dmaap consumer client...."); + if (client != null) { + client.close(); + } } @Override @@ -228,7 +223,5 @@ public class DmaapConsumerImpl implements Consumer { } } - public Metric getMetric(String name){ - return metricRegistry.metric(name); - } + } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java index 7ed06a9e3..888b432b0 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java @@ -24,16 +24,9 @@ package org.onap.appc.adapter.messaging.dmaap.impl; -import java.io.*; -import java.util.*; -import java.util.concurrent.TimeUnit; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -//import com.att.nsa.cambria.client.CambriaBatchingPublisher; -//import com.att.nsa.cambria.client.CambriaClientBuilders; -//import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; - import com.att.nsa.mr.client.MRBatchingPublisher; import com.att.nsa.mr.client.MRClientFactory; import org.apache.commons.lang3.StringUtils; @@ -51,6 +44,14 @@ import org.onap.appc.metricservice.publisher.LogPublisher; import org.osgi.framework.BundleContext; import org.osgi.framework.FrameworkUtil; import org.osgi.framework.ServiceReference; +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + public class DmaapProducerImpl implements Producer { @@ -62,7 +63,6 @@ public class DmaapProducerImpl implements Producer { private Properties props = null; private static MetricRegistry metricRegistry; private boolean useHttps = false; - private DmaapRequestCounterMetric dmaapKpiMetric; private boolean isMetricEnabled=false; private Set<MRBatchingPublisher> clients; @@ -94,7 +94,7 @@ public class DmaapProducerImpl implements Producer { LOG.debug("Metric getting initialized"); MetricService metricService = getMetricservice(); metricRegistry=metricService.createRegistry("APPC"); - dmaapKpiMetric = metricRegistry.metricBuilderFactory(). + DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory(). dmaapRequestCounterBuilder(). withName("DMAAP_KPI").withType(MetricType.COUNTER). withRecievedMessage(0) @@ -116,7 +116,7 @@ public class DmaapProducerImpl implements Producer { } private Set<MRBatchingPublisher> getClients() { - Set<MRBatchingPublisher> out = new HashSet<MRBatchingPublisher>(); + Set<MRBatchingPublisher> out = new HashSet<>(); for (String topic : topics) { try { String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props); @@ -133,10 +133,8 @@ public class DmaapProducerImpl implements Producer { @Override public synchronized void updateCredentials(String key, String secret) { LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); - String user = key; - String password = secret; - props.setProperty("user",String.valueOf(user)); - props.setProperty("password",String.valueOf(password)); + props.setProperty("user",String.valueOf(key)); + props.setProperty("password",String.valueOf(secret)); clients = null; } @@ -154,8 +152,8 @@ public class DmaapProducerImpl implements Producer { // Create clients once and reuse them on subsequent posts. This is // to support failover to other servers in the Dmaap cluster. if ((clients == null) || (clients.isEmpty())) { - LOG.info("Getting CambriaBatchingPublisher Clients ..."); - clients = getClients(); + LOG.info("Getting CambriaBatchingPublisher Clients ..."); + clients = getClients(); } for (MRBatchingPublisher client : clients) { @@ -178,14 +176,14 @@ public class DmaapProducerImpl implements Producer { */ @Override public void close() { - if ((clients == null) || (clients.isEmpty())) { - return; - } + if ((clients == null) || (clients.isEmpty())) { + return; + } - LOG.debug("Closing Dmaap producer clients...."); - for (MRBatchingPublisher client : clients) { + LOG.debug("Closing Dmaap producer clients...."); + for (MRBatchingPublisher client : clients) { try { - client.close(1, TimeUnit.SECONDS); + client.close(1, TimeUnit.SECONDS); } catch (IOException | InterruptedException e) { LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client)); e.printStackTrace(); @@ -199,10 +197,6 @@ public class DmaapProducerImpl implements Producer { } private MetricService getMetricservice() { -/* - return AppcDmaapAdapterActivator.getMetricService(); -*/ - BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); if (sref != null) { diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java index bf7649026..1ba474c6c 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java @@ -27,24 +27,27 @@ package org.onap.appc.adapter.messaging.dmaap.impl; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import org.onap.ccsdk.sli.core.sli.SvcLogicContext; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - import org.onap.appc.adapter.message.EventSender; import org.onap.appc.adapter.message.MessageDestination; import org.onap.appc.adapter.message.Producer; import org.onap.appc.adapter.message.event.EventHeader; import org.onap.appc.adapter.message.event.EventMessage; import org.onap.appc.adapter.message.event.EventStatus; -import org.onap.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; import org.onap.appc.configuration.Configuration; import org.onap.appc.configuration.ConfigurationFactory; import org.onap.appc.exceptions.APPCException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + public class EventSenderDmaapImpl implements EventSender { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class); + private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class); public static final String EVENT_TOPIC_WRITE = "dmaap.event.topic.write"; public static final String DMAAP_USERNAME = "dmaap.appc.username"; public static final String DMAAP_PASSWORD = "dmaap.appc.password"; diff --git a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/java/org/onap/appc/adapter/factory/DmaapMessageAdapterFactoryImpl.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/java/org/onap/appc/adapter/factory/DmaapMessageAdapterFactoryImpl.java index 19eb4cea8..cb23f5faa 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/java/org/onap/appc/adapter/factory/DmaapMessageAdapterFactoryImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/java/org/onap/appc/adapter/factory/DmaapMessageAdapterFactoryImpl.java @@ -30,29 +30,22 @@ import java.util.Set; import org.onap.appc.adapter.message.Consumer; import org.onap.appc.adapter.message.MessageAdapterFactory; import org.onap.appc.adapter.message.Producer; - -import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapConsumerImpl; -import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl; - import org.onap.appc.adapter.messaging.dmaap.impl.DmaapConsumerImpl; import org.onap.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; public class DmaapMessageAdapterFactoryImpl implements MessageAdapterFactory { - //Set to true if the HttpDmaap... should be used instead of the regular Dmaap... - private static final boolean HTTP = true; - - public Producer createProducer(Collection<String> pools, String writeTopic, String apiKey, String apiSecret) { - return HTTP ? new HttpDmaapProducerImpl(pools, writeTopic) : new DmaapProducerImpl(pools, writeTopic,apiKey, apiSecret); - } + + public Producer createProducer(Collection<String> pools, String writeTopic, String apiKey, String apiSecret) { + return new DmaapProducerImpl(pools, writeTopic,apiKey, apiSecret); + } - public Producer createProducer(Collection<String> pools, Set<String> writeTopics, String apiKey, String apiSecret) { - return HTTP ? new HttpDmaapProducerImpl(pools, writeTopics) : new DmaapProducerImpl(pools, writeTopics, apiKey, apiSecret); - } + public Producer createProducer(Collection<String> pools, Set<String> writeTopics, String apiKey, String apiSecret) { + return new DmaapProducerImpl(pools, writeTopics, apiKey, apiSecret); + } - public Consumer createConsumer(Collection<String> pool, String readTopic, - String clientName, String clientId, String filter_json, String apiKey, String apiSecret) { - return HTTP ? new HttpDmaapConsumerImpl(pool, readTopic, clientName, clientId, apiKey, apiSecret, filter_json) : - new DmaapConsumerImpl(pool, readTopic, clientName, clientId, apiKey, apiSecret, filter_json); - } + public Consumer createConsumer(Collection<String> pool, String readTopic, + String clientName, String clientId, String filter_json, String apiKey, String apiSecret) { + return new DmaapConsumerImpl(pool, readTopic, clientName, clientId, apiKey, apiSecret, filter_json); + } } |