aboutsummaryrefslogtreecommitdiffstats
path: root/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
diff options
context:
space:
mode:
Diffstat (limited to 'UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java')
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java112
1 files changed, 92 insertions, 20 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
index 06ef080..79317ea 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
@@ -19,10 +19,24 @@
*/
package org.onap.universalvesadapter.service;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.onap.universalvesadapter.adapter.UniversalEventAdapter;
import org.onap.universalvesadapter.dmaap.Creator;
import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
+import org.onap.universalvesadapter.exception.DMaapException;
import org.onap.universalvesadapter.exception.MapperConfigException;
+import org.onap.universalvesadapter.exception.VesException;
+import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrival;
+import org.onap.universalvesadapter.utils.DmaapConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -42,12 +56,13 @@ public class VesService {
private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
private boolean isRunning = true;
-
- @Autowired
- private DMaapService dmaapService;
-
@Autowired
private Creator creator;
+ @Autowired
+ private UniversalEventAdapter eventAdapter;
+ @Autowired
+ private DmaapConfig dmaapConfig;
+ private static List<String> list = new LinkedList<String>();
/**
@@ -55,27 +70,66 @@ public class VesService {
*/
public void start() throws MapperConfigException {
debugLogger.info("Creating Subcriber and Publisher with creator.............");
- DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber();
+
DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher();
- // Create subscriber & publisher thread
- Thread t1 = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
+ String topicArray[]= CollectorConfigPropertyRetrival.getProperyArray("subscriberTopic");
+
+
+ ExecutorService executorService=Executors.newFixedThreadPool(topicArray.length);
+ for(int i=0;i<topicArray.length;i++) {
+ String topicName =topicArray[i];
+ DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(topicArray[i]);
+
+ executorService.submit(new Runnable() {
+
+ @Override
+ public void run(){
+
+ Thread.currentThread().setName(topicName);
+ metricsLogger.info("fetch and publish from and to Dmaap started:"+Thread.currentThread().getName());
+ int pollingInternalInt=dmaapConfig.getPollingInterval();
+ debugLogger.info("The Polling Interval in Milli Second is :{}" +pollingInternalInt);
debugLogger.info("starting subscriber & publisher thread:{}", Thread.currentThread().getName());
- dmaapService.fetchAndPublishInDMaaP(subcriber, publisher, creator);
- } catch (InterruptedException e) {
- errorLogger.error("Exception in starting of subscriber & publisher thread:{}",e);
- Thread.currentThread().interrupt();
- }
- }
- });
+ while (true) {
+ synchronized (this) {
+ for (String incomingJsonString : subcriber.fetchMessages().getFetchedMessages()) {
+ list.add(incomingJsonString);
- // Start subscriber & publisher thread
- t1.setName("SNMP-COLLECTOR");
- t1.start();
+ }
+
+ if (list.isEmpty()) {
+ try {
+ Thread.sleep(pollingInternalInt);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ debugLogger.debug("number of messages to be converted :{}", list.size());
+
+ if (!list.isEmpty()) {
+ String val = ((LinkedList<String>) list).removeFirst();
+ List<String> messages = new ArrayList<>();
+ String vesEvent = processReceivedJson(val);
+ if (vesEvent!=null && (!(vesEvent.isEmpty() || vesEvent.equals("")))) {
+ messages.add(vesEvent);
+ publisher.publish(messages);
+ metricsLogger.info("Message successfully published to DMaaP Topic");
+ }
+
+ }
+
+ }
+ }
+
+
+
+ }
+ });
+ }
+
+
}
@@ -85,5 +139,23 @@ public class VesService {
public void stop() {
isRunning = false;
}
+
+ private String processReceivedJson(String incomingJsonString) {
+ String outgoingJsonString = null;
+ if (!"".equals(incomingJsonString)) {
+
+ try {
+
+ outgoingJsonString = eventAdapter.transform(incomingJsonString);
+
+ } catch (VesException exception) {
+ errorLogger.error("Received exception : {},{}" + exception.getMessage(), exception);
+ debugLogger.warn("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
+ } catch (DMaapException e) {
+ errorLogger.error("Received exception : {}", e.getMessage());
+ }
+ }
+ return outgoingJsonString;
+ }
}