diff options
author | Daniel Silverthorn <daniel.silverthorn@amdocs.com> | 2018-05-25 16:09:38 -0400 |
---|---|---|
committer | Daniel Silverthorn <daniel.silverthorn@amdocs.com> | 2018-05-25 16:10:36 -0400 |
commit | 6891c2008b93e09f5a67e45b859696cd13466f8a (patch) | |
tree | e7401e68f98c38432fce0bcfec7e5333be92a896 /src/main/java/org/onap/aai/event/EventBusConsumer.java | |
parent | 98f4f48c7d0352e2416123067e88a24cb3708f8c (diff) |
Remove cambria client dependency
Change-Id: I9760839ae44df851640b271d032a39f4bb3691c2
Issue-ID: AAI-1182
Signed-off-by: Daniel Silverthorn <daniel.silverthorn@amdocs.com>
Diffstat (limited to 'src/main/java/org/onap/aai/event/EventBusConsumer.java')
-rw-r--r-- | src/main/java/org/onap/aai/event/EventBusConsumer.java | 52 |
1 files changed, 11 insertions, 41 deletions
diff --git a/src/main/java/org/onap/aai/event/EventBusConsumer.java b/src/main/java/org/onap/aai/event/EventBusConsumer.java index a9d5478..e795e2c 100644 --- a/src/main/java/org/onap/aai/event/EventBusConsumer.java +++ b/src/main/java/org/onap/aai/event/EventBusConsumer.java @@ -20,9 +20,7 @@ */ package org.onap.aai.event; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; +import org.onap.aai.event.api.EventConsumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -33,10 +31,6 @@ import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.Arrays; -import java.util.List; import java.util.UUID; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -49,46 +43,21 @@ public class EventBusConsumer extends ScheduledPollConsumer { private Logger logger = LoggerFactory.getInstance().getLogger(EventBusConsumer.class); private Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(EventBusConsumer.class); - private final EventBusEndpoint endpoint; + private final AbstractEventBusEndpoint endpoint; - private CambriaConsumer consumer; + private EventConsumer consumer; /** * EventBusConsumer Constructor. */ - public EventBusConsumer(EventBusEndpoint endpoint, Processor processor) { + public EventBusConsumer(AbstractEventBusEndpoint endpoint, Processor processor, EventConsumer consumer) { super(endpoint, processor); super.setDelay(endpoint.getPollingDelay()); this.endpoint = endpoint; setScheduledExecutorService(new ScheduledThreadPoolExecutor(endpoint.getPoolSize())); - String[] urls = endpoint.getUrl().split(","); - - List<String> urlList = null; - - if (urls != null) { - urlList = Arrays.asList(urls); - } - - try { - - ConsumerBuilder consumerBuilder = new CambriaClientBuilders.ConsumerBuilder() - .usingHosts(urlList).onTopic(endpoint.getEventTopic()) - .knownAs(endpoint.getGroupName(), endpoint.getGroupId()); - - String apiKey = endpoint.getApiKey(); - String apiSecret = endpoint.getApiSecret(); - - if (apiKey != null && apiSecret != null) { - consumerBuilder.authenticatedBy(endpoint.getApiKey(), endpoint.getApiSecret()); - } - - consumer = consumerBuilder.build(); - - } catch (MalformedURLException | GeneralSecurityException e) { - logger.error(RouterCoreMsgs.EVENT_CONSUMER_CREATION_EXCEPTION, e, e.getLocalizedMessage()); - } + this.consumer = consumer; } /** @@ -104,7 +73,8 @@ public class EventBusConsumer extends ScheduledPollConsumer { int processCount = 0; - Iterable<String> messages = consumer.fetch(); + //Iterable<String> messages = consumer.fetch(); + Iterable<String> messages = consumer.consumeAndCommit(); String topic = endpoint.getEventTopic(); @@ -119,15 +89,15 @@ public class EventBusConsumer extends ScheduledPollConsumer { @Override protected void doStop() throws Exception { super.doStop(); - if (consumer != null) { - consumer.close(); + if (endpoint != null) { + endpoint.close(); } } @Override protected void doShutdown() throws Exception { super.doShutdown(); - if (consumer != null) { - consumer.close(); + if (endpoint != null) { + endpoint.close(); } } |