diff options
Diffstat (limited to 'appc-event-listener/appc-event-listener-bundle')
-rw-r--r-- | appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java | 130 |
1 files changed, 64 insertions, 66 deletions
diff --git a/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java b/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java index 1f1154f78..506593ed1 100644 --- a/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java +++ b/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java @@ -23,10 +23,9 @@ */ package org.openecomp.appc.listener.impl; + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; - -import org.openecomp.appc.adapter.factory.DmaapMessageAdapterFactoryImpl; import org.openecomp.appc.adapter.factory.MessageService; import org.openecomp.appc.adapter.message.Consumer; import org.openecomp.appc.adapter.message.MessageAdapterFactory; @@ -40,12 +39,15 @@ import org.osgi.framework.FrameworkUtil; import org.osgi.framework.ServiceReference; import org.slf4j.MDC; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; /** - * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure that only well formed - * messages are sent and received on DMaaP. - * + * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure + * that only well formed messages are sent and received on DMaaP. */ public class EventHandlerImpl implements EventHandler { @@ -100,10 +102,10 @@ public class EventHandlerImpl implements EventHandler { private Consumer reader = null; private Producer producer = null; - + public EventHandlerImpl(ListenerProperties props) { - pool = new HashSet<String>(); - writeTopics = new HashSet<String>(); + pool = new HashSet<>(); + writeTopics = new HashSet<>(); if (props != null) { readTopic = props.getProperty(ListenerProperties.KEYS.TOPIC_READ); @@ -115,7 +117,7 @@ public class EventHandlerImpl implements EventHandler { filter_json = props.getProperty(ListenerProperties.KEYS.TOPIC_READ_FILTER); READ_TIMEOUT = Integer - .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT))); + .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT))); String hostnames = props.getProperty(ListenerProperties.KEYS.HOSTS); if (hostnames != null && !hostnames.isEmpty()) { @@ -134,8 +136,8 @@ public class EventHandlerImpl implements EventHandler { messageService = MessageService.parse(props.getProperty(ListenerProperties.KEYS.MESSAGE_SERVICE)); LOG.info(String.format( - "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Wriring to [%s]. Authenticated using %s", - messageService, hostnames, readTopic, filter_json, writeTopics, apiKey)); + "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Wriring to [%s]. Authenticated using %s", + messageService, hostnames, readTopic, filter_json, writeTopics, apiKey)); } } @@ -146,24 +148,18 @@ public class EventHandlerImpl implements EventHandler { @Override public List<String> getIncomingEvents(int limit) { - List<String> out = new ArrayList<String>(); + List<String> out = new ArrayList<>(); LOG.info(String.format("Getting up to %d incoming events", limit)); // reuse the consumer object instead of creating a new one every time if (reader == null) { - LOG.info("Getting Consumer..."); - reader = getConsumer(); + LOG.info("Getting Consumer..."); + reader = getConsumer(); } - - List<String> items = null; - try{ - items = reader.fetch(READ_TIMEOUT * 1000, limit); - }catch(Error r){ - LOG.error("EvenHandlerImpl.getIncomingEvents",r); - } - - - for (String item : items) { - out.add(item); + if (reader != null) { + List<String> items = reader.fetch(READ_TIMEOUT * 1000, limit); + for (String item : items) { + out.add(item); + } } LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId)); return out; @@ -189,51 +185,52 @@ public class EventHandlerImpl implements EventHandler { public void postStatus(String partition, String event) { LOG.debug(String.format("Posting Message [%s]", event)); if (producer == null) { - LOG.info("Getting Producer..."); - producer = getProducer(); + LOG.info("Getting Producer..."); + producer = getProducer(); } producer.post(partition, event); } /** * Returns a consumer object for direct access to our Cambria consumer interface - * + * * @return An instance of the consumer interface */ protected Consumer getConsumer() { LOG.debug(String.format("Getting Consumer: %s %s/%s/%s", pool, readTopic, clientName, clientId)); if (filter_json == null && writeTopics.contains(readTopic)) { LOG.error( - "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****"); + "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****"); } - - Consumer out=null; + + Consumer out = null; BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext(); if (ctx != null) { - ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName()); - if (svcRef != null) { - try{ - out = ((MessageAdapterFactory) ctx.getService(svcRef)).createConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret); - }catch(Error e){ - //TODO:create eelf message - LOG.error("EvenHandlerImp.getConsumer calling MessageAdapterFactor.createConsumer",e); - } - if( out != null ) { - for (String url : pool) { - if (url.contains("3905") || url.contains("https")) { - out.useHttps(true); - break; - } - } - } - } + ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName()); + if (svcRef != null) { + try { + out = ((MessageAdapterFactory) ctx.getService(svcRef)) + .createConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret); + } catch (Exception e) { + //TODO:create eelf message + LOG.error("EvenHandlerImp.getConsumer calling MessageAdapterFactor.createConsumer", e); + } + if (out != null) { + for (String url : pool) { + if (url.contains("3905") || url.contains("https")) { + out.useHttps(true); + break; + } + } + } + } } return out; } /** * Returns a consumer object for direct access to our Cambria producer interface - * + * * @return An instance of the producer interface */ protected Producer getProducer() { @@ -242,33 +239,34 @@ public class EventHandlerImpl implements EventHandler { Producer out = null; BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext(); if (ctx != null) { - ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName()); - if (svcRef != null) { - out = ((MessageAdapterFactory) ctx.getService(svcRef)).createProducer(pool, writeTopics,apiKey, apiSecret); - if( out != null ) { - for (String url : pool) { - if (url.contains("3905") || url.contains("https")) { - out.useHttps(true); - break; - } - } - } - } + ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName()); + if (svcRef != null) { + out = ((MessageAdapterFactory) ctx.getService(svcRef)) + .createProducer(pool, writeTopics, apiKey, apiSecret); + if (out != null) { + for (String url : pool) { + if (url.contains("3905") || url.contains("https")) { + out.useHttps(true); + break; + } + } + } + } } return out; } @Override public void closeClients() { - LOG.debug("Closing Consumer and Producer DMaaP clients"); + LOG.debug("Closing Consumer and Producer DMaaP clients"); if (reader != null) { - reader.close(); + reader.close(); } if (producer != null) { - producer.close(); + producer.close(); } } - + @Override public String getClientId() { return clientId; |