summaryrefslogtreecommitdiffstats
path: root/appc-event-listener/appc-event-listener-bundle
diff options
context:
space:
mode:
Diffstat (limited to 'appc-event-listener/appc-event-listener-bundle')
-rw-r--r--appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java130
1 files changed, 64 insertions, 66 deletions
diff --git a/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java b/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java
index 1f1154f78..506593ed1 100644
--- a/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java
+++ b/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java
@@ -23,10 +23,9 @@
*/
package org.openecomp.appc.listener.impl;
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-
-import org.openecomp.appc.adapter.factory.DmaapMessageAdapterFactoryImpl;
import org.openecomp.appc.adapter.factory.MessageService;
import org.openecomp.appc.adapter.message.Consumer;
import org.openecomp.appc.adapter.message.MessageAdapterFactory;
@@ -40,12 +39,15 @@ import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;
import org.slf4j.MDC;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
/**
- * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure that only well formed
- * messages are sent and received on DMaaP.
- *
+ * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure
+ * that only well formed messages are sent and received on DMaaP.
*/
public class EventHandlerImpl implements EventHandler {
@@ -100,10 +102,10 @@ public class EventHandlerImpl implements EventHandler {
private Consumer reader = null;
private Producer producer = null;
-
+
public EventHandlerImpl(ListenerProperties props) {
- pool = new HashSet<String>();
- writeTopics = new HashSet<String>();
+ pool = new HashSet<>();
+ writeTopics = new HashSet<>();
if (props != null) {
readTopic = props.getProperty(ListenerProperties.KEYS.TOPIC_READ);
@@ -115,7 +117,7 @@ public class EventHandlerImpl implements EventHandler {
filter_json = props.getProperty(ListenerProperties.KEYS.TOPIC_READ_FILTER);
READ_TIMEOUT = Integer
- .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT)));
+ .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT)));
String hostnames = props.getProperty(ListenerProperties.KEYS.HOSTS);
if (hostnames != null && !hostnames.isEmpty()) {
@@ -134,8 +136,8 @@ public class EventHandlerImpl implements EventHandler {
messageService = MessageService.parse(props.getProperty(ListenerProperties.KEYS.MESSAGE_SERVICE));
LOG.info(String.format(
- "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Wriring to [%s]. Authenticated using %s",
- messageService, hostnames, readTopic, filter_json, writeTopics, apiKey));
+ "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Wriring to [%s]. Authenticated using %s",
+ messageService, hostnames, readTopic, filter_json, writeTopics, apiKey));
}
}
@@ -146,24 +148,18 @@ public class EventHandlerImpl implements EventHandler {
@Override
public List<String> getIncomingEvents(int limit) {
- List<String> out = new ArrayList<String>();
+ List<String> out = new ArrayList<>();
LOG.info(String.format("Getting up to %d incoming events", limit));
// reuse the consumer object instead of creating a new one every time
if (reader == null) {
- LOG.info("Getting Consumer...");
- reader = getConsumer();
+ LOG.info("Getting Consumer...");
+ reader = getConsumer();
}
-
- List<String> items = null;
- try{
- items = reader.fetch(READ_TIMEOUT * 1000, limit);
- }catch(Error r){
- LOG.error("EvenHandlerImpl.getIncomingEvents",r);
- }
-
-
- for (String item : items) {
- out.add(item);
+ if (reader != null) {
+ List<String> items = reader.fetch(READ_TIMEOUT * 1000, limit);
+ for (String item : items) {
+ out.add(item);
+ }
}
LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId));
return out;
@@ -189,51 +185,52 @@ public class EventHandlerImpl implements EventHandler {
public void postStatus(String partition, String event) {
LOG.debug(String.format("Posting Message [%s]", event));
if (producer == null) {
- LOG.info("Getting Producer...");
- producer = getProducer();
+ LOG.info("Getting Producer...");
+ producer = getProducer();
}
producer.post(partition, event);
}
/**
* Returns a consumer object for direct access to our Cambria consumer interface
- *
+ *
* @return An instance of the consumer interface
*/
protected Consumer getConsumer() {
LOG.debug(String.format("Getting Consumer: %s %s/%s/%s", pool, readTopic, clientName, clientId));
if (filter_json == null && writeTopics.contains(readTopic)) {
LOG.error(
- "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
+ "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
}
-
- Consumer out=null;
+
+ Consumer out = null;
BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
if (ctx != null) {
- ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
- if (svcRef != null) {
- try{
- out = ((MessageAdapterFactory) ctx.getService(svcRef)).createConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
- }catch(Error e){
- //TODO:create eelf message
- LOG.error("EvenHandlerImp.getConsumer calling MessageAdapterFactor.createConsumer",e);
- }
- if( out != null ) {
- for (String url : pool) {
- if (url.contains("3905") || url.contains("https")) {
- out.useHttps(true);
- break;
- }
- }
- }
- }
+ ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
+ if (svcRef != null) {
+ try {
+ out = ((MessageAdapterFactory) ctx.getService(svcRef))
+ .createConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
+ } catch (Exception e) {
+ //TODO:create eelf message
+ LOG.error("EvenHandlerImp.getConsumer calling MessageAdapterFactor.createConsumer", e);
+ }
+ if (out != null) {
+ for (String url : pool) {
+ if (url.contains("3905") || url.contains("https")) {
+ out.useHttps(true);
+ break;
+ }
+ }
+ }
+ }
}
return out;
}
/**
* Returns a consumer object for direct access to our Cambria producer interface
- *
+ *
* @return An instance of the producer interface
*/
protected Producer getProducer() {
@@ -242,33 +239,34 @@ public class EventHandlerImpl implements EventHandler {
Producer out = null;
BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
if (ctx != null) {
- ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
- if (svcRef != null) {
- out = ((MessageAdapterFactory) ctx.getService(svcRef)).createProducer(pool, writeTopics,apiKey, apiSecret);
- if( out != null ) {
- for (String url : pool) {
- if (url.contains("3905") || url.contains("https")) {
- out.useHttps(true);
- break;
- }
- }
- }
- }
+ ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
+ if (svcRef != null) {
+ out = ((MessageAdapterFactory) ctx.getService(svcRef))
+ .createProducer(pool, writeTopics, apiKey, apiSecret);
+ if (out != null) {
+ for (String url : pool) {
+ if (url.contains("3905") || url.contains("https")) {
+ out.useHttps(true);
+ break;
+ }
+ }
+ }
+ }
}
return out;
}
@Override
public void closeClients() {
- LOG.debug("Closing Consumer and Producer DMaaP clients");
+ LOG.debug("Closing Consumer and Producer DMaaP clients");
if (reader != null) {
- reader.close();
+ reader.close();
}
if (producer != null) {
- producer.close();
+ producer.close();
}
}
-
+
@Override
public String getClientId() {
return clientId;