summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java26
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java3
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java44
3 files changed, 71 insertions, 2 deletions
diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java b/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java
index 12f0cc8..1532f43 100644
--- a/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java
+++ b/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java
@@ -17,18 +17,21 @@
*/
package org.onap.pomba.contextaggregator.config;
+import java.util.Collection;
import java.util.Properties;
+
import org.onap.pomba.contextaggregator.publisher.EventPublisherFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+
import com.att.nsa.mr.client.MRClientFactory;
import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.MRTopicManager;
import com.att.nsa.mr.client.impl.MRConsumerImpl;
@Configuration
public class TransportConfig {
-
@Bean
public MRConsumer consumer(@Value("${transport.consume.host}") String host,
@Value("${transport.consume.port}") String port, @Value("${transport.consume.topic}") String topic,
@@ -38,6 +41,7 @@ public class TransportConfig {
@Value("${transport.consume.timeout}") int timeout, @Value("${transport.consume.batchsize}") int batchSize,
@Value("${transport.consume.msglimit}") int msgLimit, @Value("${transport.consume.type}") String type) {
+
String hostStr = host + ":" + port;
final MRConsumer consumer = MRClientFactory.createConsumer(hostStr, topic, motsid, pass, consumerGroup,
@@ -51,6 +55,25 @@ public class TransportConfig {
}
@Bean
+ public MRTopicManager messageRouterTopicMgr (@Value("${transport.consume.host}") String host,
+ @Value("${transport.consume.port}") String port,
+ @Value("${transport.message-router.apiKey}") String apiKey,
+ @Value("${transport.message-router.apiSecret}") String apiSecret
+ ) {
+
+ String hostStr = host + ":" + port;
+ // Verify if all topics ()
+ Collection<String> hostSet = java.util.Arrays.asList(hostStr);
+ MRTopicManager mgr = MRClientFactory.createTopicManager(hostSet, apiKey, apiSecret);
+ return mgr;
+ }
+
+ @Bean
+ public String messageRouterRequiredPombaTopicList(@Value("${transport.message-router.requiredPombaTopics}") String requiredPombaTopics) {
+ return requiredPombaTopics;
+ }
+
+ @Bean
public EventPublisherFactory publisherFactory(@Value("${transport.publish.host}") String host,
@Value("${transport.publish.port}") String port, @Value("${transport.publish.topic}") String topic,
@Value("${transport.publish.motsid}") String motsid, @Value("${transport.publish.pass}") String pass,
@@ -63,4 +86,5 @@ public class TransportConfig {
return new EventPublisherFactory(hostStr, topic, motsid, pass, batchSize, maxAge, delay, type, partition,
retries);
}
+
}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java
index cee1fda..396f684 100644
--- a/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java
+++ b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java
@@ -30,7 +30,8 @@ public enum ContextAggregatorError {
PUBLISHER_SEND_ERROR("CA-106", "Error encountered when publishing messages: {0}"),
PUBLISHER_CLOSE_ERROR("CA-107", "Error encountered when closing publisher: {0}"),
FAILED_TO_PUBLISH_RESULT("CA-108", "Failed to publish model data: {0}"),
- BUILDER_PROPERTIES_NOT_FOUND("CA-109", "No builder properties were found under location(s): {0}");
+ BUILDER_PROPERTIES_NOT_FOUND("CA-109", "No builder properties were found under location(s): {0}"),
+ FAILED_TO_CREATE_POMBA_TOPICS("CA-110", "Failed to create POMBA Topics: {0}");
private String errorId;
private String message;
diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
index e2758ab..f79bf96 100644
--- a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
+++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
@@ -20,16 +20,22 @@ package org.onap.pomba.contextaggregator.service;
import com.att.aft.dme2.internal.gson.Gson;
import com.att.aft.dme2.internal.gson.GsonBuilder;
import com.att.aft.dme2.internal.gson.JsonSyntaxException;
+import com.att.nsa.apiClient.http.HttpException;
import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRClientFactory;
import com.att.nsa.mr.client.MRConsumer;
import com.att.nsa.mr.client.MRPublisher;
+import com.att.nsa.mr.client.MRTopicManager;
import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -46,6 +52,8 @@ import org.onap.pomba.contextaggregator.rest.RestRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@@ -60,6 +68,12 @@ public class ContextAggregatorProcessor implements Callable<Void> {
private MRConsumer consumer;
@Autowired
+ private MRTopicManager messageRouterTopicMgr;
+
+ @Autowired
+ private String messageRouterRequiredPombaTopicList;
+
+ @Autowired
private EventPublisherFactory publisherFactory;
@Autowired
@@ -110,6 +124,8 @@ public class ContextAggregatorProcessor implements Callable<Void> {
@Override
public Void call() throws Exception {
+ createPombaTopics();
+
while (true) {
for (String event : consumer.fetch()) {
executor.execute(() -> {
@@ -233,5 +249,33 @@ public class ContextAggregatorProcessor implements Callable<Void> {
}
}
+ private List<String> getRequiredTopicList(String messageRouterRequiredPombaTopicList) {
+ List<String> pombaTopicList = new ArrayList<String>();
+ String noSpacePombaTopicList = messageRouterRequiredPombaTopicList.replaceAll("\\s", "");
+ String[] pombaTopicStrSet = noSpacePombaTopicList.split(",");
+ for (int i = 0; i < pombaTopicStrSet.length; i++) {
+ pombaTopicList.add(new String(pombaTopicStrSet[i]));
+ }
+ return pombaTopicList;
+ }
+
+ private void createPombaTopics () {
+
+ List<String> requiredTopicList = getRequiredTopicList(messageRouterRequiredPombaTopicList);
+
+ String topicDescription = "create default topic";
+ int partitionCount = 1;
+ int replicationCount = 1;
+
+ for ( String topic_required : requiredTopicList) {
+ try {
+ messageRouterTopicMgr.createTopic(topic_required, topicDescription, partitionCount, replicationCount);
+ } catch (HttpException e1) {
+ log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e1.getMessage()));
+ } catch (IOException e) {
+ log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e.getMessage()));
+ }
+ }
+ }
}