summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java')
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java44
1 files changed, 44 insertions, 0 deletions
diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
index e2758ab..f79bf96 100644
--- a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
+++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
@@ -20,16 +20,22 @@ package org.onap.pomba.contextaggregator.service;
import com.att.aft.dme2.internal.gson.Gson;
import com.att.aft.dme2.internal.gson.GsonBuilder;
import com.att.aft.dme2.internal.gson.JsonSyntaxException;
+import com.att.nsa.apiClient.http.HttpException;
import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRClientFactory;
import com.att.nsa.mr.client.MRConsumer;
import com.att.nsa.mr.client.MRPublisher;
+import com.att.nsa.mr.client.MRTopicManager;
import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -46,6 +52,8 @@ import org.onap.pomba.contextaggregator.rest.RestRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@@ -60,6 +68,12 @@ public class ContextAggregatorProcessor implements Callable<Void> {
private MRConsumer consumer;
@Autowired
+ private MRTopicManager messageRouterTopicMgr;
+
+ @Autowired
+ private String messageRouterRequiredPombaTopicList;
+
+ @Autowired
private EventPublisherFactory publisherFactory;
@Autowired
@@ -110,6 +124,8 @@ public class ContextAggregatorProcessor implements Callable<Void> {
@Override
public Void call() throws Exception {
+ createPombaTopics();
+
while (true) {
for (String event : consumer.fetch()) {
executor.execute(() -> {
@@ -233,5 +249,33 @@ public class ContextAggregatorProcessor implements Callable<Void> {
}
}
+ private List<String> getRequiredTopicList(String messageRouterRequiredPombaTopicList) {
+ List<String> pombaTopicList = new ArrayList<String>();
+ String noSpacePombaTopicList = messageRouterRequiredPombaTopicList.replaceAll("\\s", "");
+ String[] pombaTopicStrSet = noSpacePombaTopicList.split(",");
+ for (int i = 0; i < pombaTopicStrSet.length; i++) {
+ pombaTopicList.add(new String(pombaTopicStrSet[i]));
+ }
+ return pombaTopicList;
+ }
+
+ private void createPombaTopics () {
+
+ List<String> requiredTopicList = getRequiredTopicList(messageRouterRequiredPombaTopicList);
+
+ String topicDescription = "create default topic";
+ int partitionCount = 1;
+ int replicationCount = 1;
+
+ for ( String topic_required : requiredTopicList) {
+ try {
+ messageRouterTopicMgr.createTopic(topic_required, topicDescription, partitionCount, replicationCount);
+ } catch (HttpException e1) {
+ log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e1.getMessage()));
+ } catch (IOException e) {
+ log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e.getMessage()));
+ }
+ }
+ }
}