diff options
author | Leigh, Phillip (pl876u) <phillip.leigh@amdocs.com> | 2018-11-21 16:43:06 -0500 |
---|---|---|
committer | Leigh, Phillip (pl876u) <phillip.leigh@amdocs.com> | 2018-11-22 11:47:03 -0500 |
commit | a74c306af14723da1f8a24bea5e3a8016dc449e0 (patch) | |
tree | 068e19c69e97e3e36e8266c25b742a4a3ad86a63 /src/main/java/org/onap/pomba/contextaggregator/service | |
parent | 7de0e85a95552130aaa7cea07487543b2700e9bf (diff) |
[PIE-991]Create Audit Topics in CtxAggr
Issue-ID: LOG-811
Change-Id: I763cfb90b399514c5930539a1b71e762cb43bc6d
Signed-off-by: Leigh, Phillip (pl876u) <phillip.leigh@amdocs.com>
Diffstat (limited to 'src/main/java/org/onap/pomba/contextaggregator/service')
-rw-r--r-- | src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java | 44 |
1 files changed, 44 insertions, 0 deletions
diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java index e2758ab..f79bf96 100644 --- a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java +++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java @@ -20,16 +20,22 @@ package org.onap.pomba.contextaggregator.service; import com.att.aft.dme2.internal.gson.Gson; import com.att.aft.dme2.internal.gson.GsonBuilder; import com.att.aft.dme2.internal.gson.JsonSyntaxException; +import com.att.nsa.apiClient.http.HttpException; import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; import com.att.nsa.mr.client.MRConsumer; import com.att.nsa.mr.client.MRPublisher; +import com.att.nsa.mr.client.MRTopicManager; import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; + +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -46,6 +52,8 @@ import org.onap.pomba.contextaggregator.rest.RestRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @Component @@ -60,6 +68,12 @@ public class ContextAggregatorProcessor implements Callable<Void> { private MRConsumer consumer; @Autowired + private MRTopicManager messageRouterTopicMgr; + + @Autowired + private String messageRouterRequiredPombaTopicList; + + @Autowired private EventPublisherFactory publisherFactory; @Autowired @@ -110,6 +124,8 @@ public class ContextAggregatorProcessor implements Callable<Void> { @Override public Void call() throws Exception { + createPombaTopics(); + while (true) { for (String event : consumer.fetch()) { executor.execute(() -> { @@ -233,5 +249,33 @@ public class ContextAggregatorProcessor implements Callable<Void> { } } + private List<String> getRequiredTopicList(String messageRouterRequiredPombaTopicList) { + List<String> pombaTopicList = new ArrayList<String>(); + String noSpacePombaTopicList = messageRouterRequiredPombaTopicList.replaceAll("\\s", ""); + String[] pombaTopicStrSet = noSpacePombaTopicList.split(","); + for (int i = 0; i < pombaTopicStrSet.length; i++) { + pombaTopicList.add(new String(pombaTopicStrSet[i])); + } + return pombaTopicList; + } + + private void createPombaTopics () { + + List<String> requiredTopicList = getRequiredTopicList(messageRouterRequiredPombaTopicList); + + String topicDescription = "create default topic"; + int partitionCount = 1; + int replicationCount = 1; + + for ( String topic_required : requiredTopicList) { + try { + messageRouterTopicMgr.createTopic(topic_required, topicDescription, partitionCount, replicationCount); + } catch (HttpException e1) { + log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e1.getMessage())); + } catch (IOException e) { + log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e.getMessage())); + } + } + } } |