diff options
Diffstat (limited to 'UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java')
-rw-r--r-- | UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java | 112 |
1 files changed, 92 insertions, 20 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java index 06ef080..79317ea 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java @@ -19,10 +19,24 @@ */ package org.onap.universalvesadapter.service; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.onap.universalvesadapter.adapter.UniversalEventAdapter; import org.onap.universalvesadapter.dmaap.Creator; import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher; import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber; +import org.onap.universalvesadapter.exception.DMaapException; import org.onap.universalvesadapter.exception.MapperConfigException; +import org.onap.universalvesadapter.exception.VesException; +import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrival; +import org.onap.universalvesadapter.utils.DmaapConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -42,12 +56,13 @@ public class VesService { private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); private boolean isRunning = true; - - @Autowired - private DMaapService dmaapService; - @Autowired private Creator creator; + @Autowired + private UniversalEventAdapter eventAdapter; + @Autowired + private DmaapConfig dmaapConfig; + private static List<String> list = new LinkedList<String>(); /** @@ -55,27 +70,66 @@ public class VesService { */ public void start() throws MapperConfigException { debugLogger.info("Creating Subcriber and Publisher with creator............."); - DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(); + DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher(); - // Create subscriber & publisher thread - Thread t1 = new Thread(new Runnable() { - @Override - public void run() { - try { + String topicArray[]= CollectorConfigPropertyRetrival.getProperyArray("subscriberTopic"); + + + ExecutorService executorService=Executors.newFixedThreadPool(topicArray.length); + for(int i=0;i<topicArray.length;i++) { + String topicName =topicArray[i]; + DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(topicArray[i]); + + executorService.submit(new Runnable() { + + @Override + public void run(){ + + Thread.currentThread().setName(topicName); + metricsLogger.info("fetch and publish from and to Dmaap started:"+Thread.currentThread().getName()); + int pollingInternalInt=dmaapConfig.getPollingInterval(); + debugLogger.info("The Polling Interval in Milli Second is :{}" +pollingInternalInt); debugLogger.info("starting subscriber & publisher thread:{}", Thread.currentThread().getName()); - dmaapService.fetchAndPublishInDMaaP(subcriber, publisher, creator); - } catch (InterruptedException e) { - errorLogger.error("Exception in starting of subscriber & publisher thread:{}",e); - Thread.currentThread().interrupt(); - } - } - }); + while (true) { + synchronized (this) { + for (String incomingJsonString : subcriber.fetchMessages().getFetchedMessages()) { + list.add(incomingJsonString); - // Start subscriber & publisher thread - t1.setName("SNMP-COLLECTOR"); - t1.start(); + } + + if (list.isEmpty()) { + try { + Thread.sleep(pollingInternalInt); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + debugLogger.debug("number of messages to be converted :{}", list.size()); + + if (!list.isEmpty()) { + String val = ((LinkedList<String>) list).removeFirst(); + List<String> messages = new ArrayList<>(); + String vesEvent = processReceivedJson(val); + if (vesEvent!=null && (!(vesEvent.isEmpty() || vesEvent.equals("")))) { + messages.add(vesEvent); + publisher.publish(messages); + metricsLogger.info("Message successfully published to DMaaP Topic"); + } + + } + + } + } + + + + } + }); + } + + } @@ -85,5 +139,23 @@ public class VesService { public void stop() { isRunning = false; } + + private String processReceivedJson(String incomingJsonString) { + String outgoingJsonString = null; + if (!"".equals(incomingJsonString)) { + + try { + + outgoingJsonString = eventAdapter.transform(incomingJsonString); + + } catch (VesException exception) { + errorLogger.error("Received exception : {},{}" + exception.getMessage(), exception); + debugLogger.warn("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED."); + } catch (DMaapException e) { + errorLogger.error("Received exception : {}", e.getMessage()); + } + } + return outgoingJsonString; + } } |