From a74c306af14723da1f8a24bea5e3a8016dc449e0 Mon Sep 17 00:00:00 2001 From: "Leigh, Phillip (pl876u)" Date: Wed, 21 Nov 2018 16:43:06 -0500 Subject: [PIE-991]Create Audit Topics in CtxAggr Issue-ID: LOG-811 Change-Id: I763cfb90b399514c5930539a1b71e762cb43bc6d Signed-off-by: Leigh, Phillip (pl876u) --- src/main/docker/Dockerfile | 2 +- .../contextaggregator/config/TransportConfig.java | 26 ++++++++++++- .../exception/ContextAggregatorError.java | 3 +- .../service/ContextAggregatorProcessor.java | 44 ++++++++++++++++++++++ 4 files changed, 72 insertions(+), 3 deletions(-) (limited to 'src/main') diff --git a/src/main/docker/Dockerfile b/src/main/docker/Dockerfile index ad64f1d..1cd5e65 100644 --- a/src/main/docker/Dockerfile +++ b/src/main/docker/Dockerfile @@ -15,6 +15,6 @@ ADD startService.sh $MICROSERVICE_HOME/bin/ RUN chmod 755 $MICROSERVICE_HOME/config/* RUN chmod 755 $MICROSERVICE_HOME/lib/* RUN chmod 755 $MICROSERVICE_HOME/bin/* -RUN apk --no-cache add curl +# RUN apk --no-cache add curl CMD ["/opt/app/bin/startService.sh"] diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java b/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java index 12f0cc8..1532f43 100644 --- a/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java +++ b/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java @@ -17,18 +17,21 @@ */ package org.onap.pomba.contextaggregator.config; +import java.util.Collection; import java.util.Properties; + import org.onap.pomba.contextaggregator.publisher.EventPublisherFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; + import com.att.nsa.mr.client.MRClientFactory; import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.MRTopicManager; import com.att.nsa.mr.client.impl.MRConsumerImpl; @Configuration public class TransportConfig { - @Bean public MRConsumer consumer(@Value("${transport.consume.host}") String host, @Value("${transport.consume.port}") String port, @Value("${transport.consume.topic}") String topic, @@ -38,6 +41,7 @@ public class TransportConfig { @Value("${transport.consume.timeout}") int timeout, @Value("${transport.consume.batchsize}") int batchSize, @Value("${transport.consume.msglimit}") int msgLimit, @Value("${transport.consume.type}") String type) { + String hostStr = host + ":" + port; final MRConsumer consumer = MRClientFactory.createConsumer(hostStr, topic, motsid, pass, consumerGroup, @@ -50,6 +54,25 @@ public class TransportConfig { return consumer; } + @Bean + public MRTopicManager messageRouterTopicMgr (@Value("${transport.consume.host}") String host, + @Value("${transport.consume.port}") String port, + @Value("${transport.message-router.apiKey}") String apiKey, + @Value("${transport.message-router.apiSecret}") String apiSecret + ) { + + String hostStr = host + ":" + port; + // Verify if all topics () + Collection hostSet = java.util.Arrays.asList(hostStr); + MRTopicManager mgr = MRClientFactory.createTopicManager(hostSet, apiKey, apiSecret); + return mgr; + } + + @Bean + public String messageRouterRequiredPombaTopicList(@Value("${transport.message-router.requiredPombaTopics}") String requiredPombaTopics) { + return requiredPombaTopics; + } + @Bean public EventPublisherFactory publisherFactory(@Value("${transport.publish.host}") String host, @Value("${transport.publish.port}") String port, @Value("${transport.publish.topic}") String topic, @@ -63,4 +86,5 @@ public class TransportConfig { return new EventPublisherFactory(hostStr, topic, motsid, pass, batchSize, maxAge, delay, type, partition, retries); } + } diff --git a/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java index cee1fda..396f684 100644 --- a/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java +++ b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java @@ -30,7 +30,8 @@ public enum ContextAggregatorError { PUBLISHER_SEND_ERROR("CA-106", "Error encountered when publishing messages: {0}"), PUBLISHER_CLOSE_ERROR("CA-107", "Error encountered when closing publisher: {0}"), FAILED_TO_PUBLISH_RESULT("CA-108", "Failed to publish model data: {0}"), - BUILDER_PROPERTIES_NOT_FOUND("CA-109", "No builder properties were found under location(s): {0}"); + BUILDER_PROPERTIES_NOT_FOUND("CA-109", "No builder properties were found under location(s): {0}"), + FAILED_TO_CREATE_POMBA_TOPICS("CA-110", "Failed to create POMBA Topics: {0}"); private String errorId; private String message; diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java index e2758ab..f79bf96 100644 --- a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java +++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java @@ -20,16 +20,22 @@ package org.onap.pomba.contextaggregator.service; import com.att.aft.dme2.internal.gson.Gson; import com.att.aft.dme2.internal.gson.GsonBuilder; import com.att.aft.dme2.internal.gson.JsonSyntaxException; +import com.att.nsa.apiClient.http.HttpException; import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; import com.att.nsa.mr.client.MRConsumer; import com.att.nsa.mr.client.MRPublisher; +import com.att.nsa.mr.client.MRTopicManager; import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; + +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -46,6 +52,8 @@ import org.onap.pomba.contextaggregator.rest.RestRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @Component @@ -59,6 +67,12 @@ public class ContextAggregatorProcessor implements Callable { @Autowired private MRConsumer consumer; + @Autowired + private MRTopicManager messageRouterTopicMgr; + + @Autowired + private String messageRouterRequiredPombaTopicList; + @Autowired private EventPublisherFactory publisherFactory; @@ -110,6 +124,8 @@ public class ContextAggregatorProcessor implements Callable { @Override public Void call() throws Exception { + createPombaTopics(); + while (true) { for (String event : consumer.fetch()) { executor.execute(() -> { @@ -233,5 +249,33 @@ public class ContextAggregatorProcessor implements Callable { } } + private List getRequiredTopicList(String messageRouterRequiredPombaTopicList) { + List pombaTopicList = new ArrayList(); + String noSpacePombaTopicList = messageRouterRequiredPombaTopicList.replaceAll("\\s", ""); + String[] pombaTopicStrSet = noSpacePombaTopicList.split(","); + for (int i = 0; i < pombaTopicStrSet.length; i++) { + pombaTopicList.add(new String(pombaTopicStrSet[i])); + } + return pombaTopicList; + } + + private void createPombaTopics () { + + List requiredTopicList = getRequiredTopicList(messageRouterRequiredPombaTopicList); + + String topicDescription = "create default topic"; + int partitionCount = 1; + int replicationCount = 1; + + for ( String topic_required : requiredTopicList) { + try { + messageRouterTopicMgr.createTopic(topic_required, topicDescription, partitionCount, replicationCount); + } catch (HttpException e1) { + log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e1.getMessage())); + } catch (IOException e) { + log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e.getMessage())); + } + } + } } -- cgit 1.2.3-korg