aboutsummaryrefslogtreecommitdiffstats
path: root/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java')
-rw-r--r--appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java112
1 files changed, 59 insertions, 53 deletions
diff --git a/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java b/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java
index 590afbe8b..ee9f30ea7 100644
--- a/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java
+++ b/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java
@@ -20,29 +20,25 @@
*/
package org.openecomp.appc.listener.impl;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.openecomp.appc.adapter.dmaap.Consumer;
-import org.openecomp.appc.adapter.dmaap.DmaapConsumer;
-import org.openecomp.appc.adapter.dmaap.DmaapProducer;
-import org.openecomp.appc.adapter.dmaap.Producer;
-import org.openecomp.appc.adapter.dmaap.DmaapConsumer;
-import org.openecomp.appc.adapter.dmaap.DmaapProducer;
+import org.openecomp.appc.adapter.factory.DmaapMessageAdapterFactoryImpl;
+import org.openecomp.appc.adapter.factory.MessageService;
+import org.openecomp.appc.adapter.message.Consumer;
+import org.openecomp.appc.adapter.message.MessageAdapterFactory;
+import org.openecomp.appc.adapter.message.Producer;
import org.openecomp.appc.listener.EventHandler;
import org.openecomp.appc.listener.ListenerProperties;
-import org.openecomp.appc.listener.ListenerProperties.MessageService;
import org.openecomp.appc.listener.util.Mapper;
import org.openecomp.appc.logging.LoggingConstants;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceReference;
import org.slf4j.MDC;
+import java.util.*;
+
/**
* This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure that only well formed
* messages are sent and received on DMaaP.
@@ -154,7 +150,16 @@ public class EventHandlerImpl implements EventHandler {
LOG.info("Getting Consumer...");
reader = getConsumer();
}
- for (String item : reader.fetch(READ_TIMEOUT * 1000, limit)) {
+
+ List<String> items = null;
+ try{
+ items = reader.fetch(READ_TIMEOUT * 1000, limit);
+ }catch(Error r){
+ LOG.error("EvenHandlerImpl.getIncomingEvents",r);
+ }
+
+
+ for (String item : items) {
out.add(item);
}
LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId));
@@ -198,13 +203,25 @@ public class EventHandlerImpl implements EventHandler {
LOG.error(
"*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
}
- Consumer out;
- out = new DmaapConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
- for (String url : pool) {
- if (url.contains("3905") || url.contains("https")) {
- out.useHttps(true);
- break;
- }
+
+ Consumer out=null;
+ BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
+ if (ctx != null) {
+ ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
+ if (svcRef != null) {
+ try{
+ out = ((MessageAdapterFactory) ctx.getService(svcRef)).createConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
+ }catch(Error e){
+ //TODO:create eelf message
+ LOG.error("EvenHandlerImp.getConsumer calling MessageAdapterFactor.createConsumer",e);
+ }
+ for (String url : pool) {
+ if (url.contains("3905") || url.contains("https")) {
+ out.useHttps(true);
+ break;
+ }
+ }
+ }
}
return out;
}
@@ -217,18 +234,19 @@ public class EventHandlerImpl implements EventHandler {
protected Producer getProducer() {
LOG.debug(String.format("Getting Producer: %s %s", pool, readTopic));
- Producer out;
- out = new DmaapProducer(pool,writeTopics);
-
- if (apiKey != null && apiSecret != null) {
- out.updateCredentials(apiKey, apiSecret);
- }
-
- for (String url : pool) {
- if (url.contains("3905") || url.contains("https")) {
- out.useHttps(true);
- break;
- }
+ Producer out = null;
+ BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
+ if (ctx != null) {
+ ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
+ if (svcRef != null) {
+ out = ((MessageAdapterFactory) ctx.getService(svcRef)).createProducer(pool, writeTopics,apiKey, apiSecret);
+ for (String url : pool) {
+ if (url.contains("3905") || url.contains("https")) {
+ out.useHttps(true);
+ break;
+ }
+ }
+ }
}
return out;
}
@@ -236,23 +254,11 @@ public class EventHandlerImpl implements EventHandler {
@Override
public void closeClients() {
LOG.debug("Closing Consumer and Producer DMaaP clients");
- switch (messageService) {
- case DMaaP:
- if (reader != null) {
- ((DmaapConsumer) reader).close();
- }
- if (producer != null) {
- ((DmaapProducer) producer).close();
- }
- break;
- default:
- // close DMaaP clients
- if (reader != null) {
- ((DmaapConsumer) reader).close();
- }
- if (producer != null) {
- ((DmaapProducer) producer).close();
- }
+ if (reader != null) {
+ reader.close();
+ }
+ if (producer != null) {
+ producer.close();
}
}