aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/aai/event/EventBusConsumer.java
diff options
context:
space:
mode:
authorDaniel Silverthorn <daniel.silverthorn@amdocs.com>2018-05-25 16:09:38 -0400
committerDaniel Silverthorn <daniel.silverthorn@amdocs.com>2018-05-25 16:10:36 -0400
commit6891c2008b93e09f5a67e45b859696cd13466f8a (patch)
treee7401e68f98c38432fce0bcfec7e5333be92a896 /src/main/java/org/onap/aai/event/EventBusConsumer.java
parent98f4f48c7d0352e2416123067e88a24cb3708f8c (diff)
Remove cambria client dependency
Change-Id: I9760839ae44df851640b271d032a39f4bb3691c2 Issue-ID: AAI-1182 Signed-off-by: Daniel Silverthorn <daniel.silverthorn@amdocs.com>
Diffstat (limited to 'src/main/java/org/onap/aai/event/EventBusConsumer.java')
-rw-r--r--src/main/java/org/onap/aai/event/EventBusConsumer.java52
1 files changed, 11 insertions, 41 deletions
diff --git a/src/main/java/org/onap/aai/event/EventBusConsumer.java b/src/main/java/org/onap/aai/event/EventBusConsumer.java
index a9d5478..e795e2c 100644
--- a/src/main/java/org/onap/aai/event/EventBusConsumer.java
+++ b/src/main/java/org/onap/aai/event/EventBusConsumer.java
@@ -20,9 +20,7 @@
*/
package org.onap.aai.event;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
+import org.onap.aai.event.api.EventConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -33,10 +31,6 @@ import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -49,46 +43,21 @@ public class EventBusConsumer extends ScheduledPollConsumer {
private Logger logger = LoggerFactory.getInstance().getLogger(EventBusConsumer.class);
private Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(EventBusConsumer.class);
- private final EventBusEndpoint endpoint;
+ private final AbstractEventBusEndpoint endpoint;
- private CambriaConsumer consumer;
+ private EventConsumer consumer;
/**
* EventBusConsumer Constructor.
*/
- public EventBusConsumer(EventBusEndpoint endpoint, Processor processor) {
+ public EventBusConsumer(AbstractEventBusEndpoint endpoint, Processor processor, EventConsumer consumer) {
super(endpoint, processor);
super.setDelay(endpoint.getPollingDelay());
this.endpoint = endpoint;
setScheduledExecutorService(new ScheduledThreadPoolExecutor(endpoint.getPoolSize()));
- String[] urls = endpoint.getUrl().split(",");
-
- List<String> urlList = null;
-
- if (urls != null) {
- urlList = Arrays.asList(urls);
- }
-
- try {
-
- ConsumerBuilder consumerBuilder = new CambriaClientBuilders.ConsumerBuilder()
- .usingHosts(urlList).onTopic(endpoint.getEventTopic())
- .knownAs(endpoint.getGroupName(), endpoint.getGroupId());
-
- String apiKey = endpoint.getApiKey();
- String apiSecret = endpoint.getApiSecret();
-
- if (apiKey != null && apiSecret != null) {
- consumerBuilder.authenticatedBy(endpoint.getApiKey(), endpoint.getApiSecret());
- }
-
- consumer = consumerBuilder.build();
-
- } catch (MalformedURLException | GeneralSecurityException e) {
- logger.error(RouterCoreMsgs.EVENT_CONSUMER_CREATION_EXCEPTION, e, e.getLocalizedMessage());
- }
+ this.consumer = consumer;
}
/**
@@ -104,7 +73,8 @@ public class EventBusConsumer extends ScheduledPollConsumer {
int processCount = 0;
- Iterable<String> messages = consumer.fetch();
+ //Iterable<String> messages = consumer.fetch();
+ Iterable<String> messages = consumer.consumeAndCommit();
String topic = endpoint.getEventTopic();
@@ -119,15 +89,15 @@ public class EventBusConsumer extends ScheduledPollConsumer {
@Override
protected void doStop() throws Exception {
super.doStop();
- if (consumer != null) {
- consumer.close();
+ if (endpoint != null) {
+ endpoint.close();
}
}
@Override
protected void doShutdown() throws Exception {
super.doShutdown();
- if (consumer != null) {
- consumer.close();
+ if (endpoint != null) {
+ endpoint.close();
}
}