summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/pomba/contextaggregator/service
diff options
context:
space:
mode:
authorLeigh, Phillip (pl876u) <phillip.leigh@amdocs.com>2018-11-21 16:43:06 -0500
committerLeigh, Phillip (pl876u) <phillip.leigh@amdocs.com>2018-11-22 11:47:03 -0500
commita74c306af14723da1f8a24bea5e3a8016dc449e0 (patch)
tree068e19c69e97e3e36e8266c25b742a4a3ad86a63 /src/main/java/org/onap/pomba/contextaggregator/service
parent7de0e85a95552130aaa7cea07487543b2700e9bf (diff)
[PIE-991]Create Audit Topics in CtxAggr
Issue-ID: LOG-811 Change-Id: I763cfb90b399514c5930539a1b71e762cb43bc6d Signed-off-by: Leigh, Phillip (pl876u) <phillip.leigh@amdocs.com>
Diffstat (limited to 'src/main/java/org/onap/pomba/contextaggregator/service')
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java44
1 files changed, 44 insertions, 0 deletions
diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
index e2758ab..f79bf96 100644
--- a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
+++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
@@ -20,16 +20,22 @@ package org.onap.pomba.contextaggregator.service;
import com.att.aft.dme2.internal.gson.Gson;
import com.att.aft.dme2.internal.gson.GsonBuilder;
import com.att.aft.dme2.internal.gson.JsonSyntaxException;
+import com.att.nsa.apiClient.http.HttpException;
import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRClientFactory;
import com.att.nsa.mr.client.MRConsumer;
import com.att.nsa.mr.client.MRPublisher;
+import com.att.nsa.mr.client.MRTopicManager;
import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -46,6 +52,8 @@ import org.onap.pomba.contextaggregator.rest.RestRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@@ -60,6 +68,12 @@ public class ContextAggregatorProcessor implements Callable<Void> {
private MRConsumer consumer;
@Autowired
+ private MRTopicManager messageRouterTopicMgr;
+
+ @Autowired
+ private String messageRouterRequiredPombaTopicList;
+
+ @Autowired
private EventPublisherFactory publisherFactory;
@Autowired
@@ -110,6 +124,8 @@ public class ContextAggregatorProcessor implements Callable<Void> {
@Override
public Void call() throws Exception {
+ createPombaTopics();
+
while (true) {
for (String event : consumer.fetch()) {
executor.execute(() -> {
@@ -233,5 +249,33 @@ public class ContextAggregatorProcessor implements Callable<Void> {
}
}
+ private List<String> getRequiredTopicList(String messageRouterRequiredPombaTopicList) {
+ List<String> pombaTopicList = new ArrayList<String>();
+ String noSpacePombaTopicList = messageRouterRequiredPombaTopicList.replaceAll("\\s", "");
+ String[] pombaTopicStrSet = noSpacePombaTopicList.split(",");
+ for (int i = 0; i < pombaTopicStrSet.length; i++) {
+ pombaTopicList.add(new String(pombaTopicStrSet[i]));
+ }
+ return pombaTopicList;
+ }
+
+ private void createPombaTopics () {
+
+ List<String> requiredTopicList = getRequiredTopicList(messageRouterRequiredPombaTopicList);
+
+ String topicDescription = "create default topic";
+ int partitionCount = 1;
+ int replicationCount = 1;
+
+ for ( String topic_required : requiredTopicList) {
+ try {
+ messageRouterTopicMgr.createTopic(topic_required, topicDescription, partitionCount, replicationCount);
+ } catch (HttpException e1) {
+ log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e1.getMessage()));
+ } catch (IOException e) {
+ log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e.getMessage()));
+ }
+ }
+ }
}