aboutsummaryrefslogtreecommitdiffstats
path: root/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
diff options
context:
space:
mode:
Diffstat (limited to 'UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java')
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java185
1 files changed, 47 insertions, 138 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
index 112d1d6..ed590a8 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
@@ -19,28 +19,14 @@
*/
package org.onap.universalvesadapter.service;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-
-import javax.annotation.Resource;
-
-import org.milyn.io.FileUtils;
-import org.onap.universalvesadapter.adapter.GenericAdapter;
-import org.onap.universalvesadapter.exception.ConfigFileReadException;
-import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException;
-import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.dmaap.Creator;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
import org.onap.universalvesadapter.exception.MapperConfigException;
-import org.onap.universalvesadapter.exception.VesException;
-import org.onap.universalvesadapter.utils.MapperConfigUtils;
-import org.onap.universalvesadapter.utils.ParallelTasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
-import org.springframework.util.FileCopyUtils;
/**
* Service that starts the universal ves adapter module to listen for events
@@ -50,128 +36,51 @@ import org.springframework.util.FileCopyUtils;
*/
@Component
public class VesService {
-
- private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
-
- private boolean isRunning = true;
-
- @Autowired
- private ConfigurableApplicationContext ctx;
-
- @Autowired
- private DMaapService dmaapService;
-
- @Autowired
- private AdapterService adapterService;
-
- @Resource(name = "universalEventAdapter")
- private GenericAdapter eventAdapter;
- @Value("${messagesInBatch}")
- private int messagesInBatch;
+ private final Logger LOGGER = LoggerFactory.getLogger(VesService.class);
+
+ private boolean isRunning = true;
+
+ @Autowired
+ private DMaapService dmaapService;
+
+ @Autowired
+ private Creator creator;
+
- @Value("${messagesInTimeInterval}")
- private long messagesInTimeInverval;
-
- @Value("${mapperConfig.file}")
- private String mapperConfigFile;
-
- /*public void start(){
-
- String incomingJsonString = dmaapService.consume();
- if(!"".equals(incomingJsonString)){
- GenericAdapter eventAdapter = adapterService.identifyIncomingJsonFormatAndReturnAdapter();
- String outgoingJsonString = eventAdapter.transform(incomingJsonString);
- System.out.println(outgoingJsonString);
- }
- }*/
-
-
- /**
- * method triggers universal ves adapter module.
- */
- public void start() {
-
- try {
- String mappingConfigFileData = FileCopyUtils.copyToString(new FileReader(mapperConfigFile));
- MapperConfigUtils.readMapperConfigFile(mappingConfigFileData);
-
-
- ParallelTasks parallelTasks = new ParallelTasks();
- while (isRunning) {
- int processingNumberOfMessage = 0;
- long start = System.currentTimeMillis();
- for (String incomingJsonString : dmaapService.consumeFromDMaap()) {
- parallelTasks.add(() -> processReceivedJson(incomingJsonString));
- processingNumberOfMessage++;
- if (processingNumberOfMessage == messagesInBatch
- || (System.currentTimeMillis() - start) > messagesInTimeInverval) {
- processingNumberOfMessage = 0;
- start = System.currentTimeMillis();
- try {
- parallelTasks.startParallelTasks();
- } catch (InterruptedException exception) {
- LOGGER.error("Processing was interrupted due to :" + exception.getMessage());
- }
- parallelTasks = new ParallelTasks();
- }
- }
- try {
- parallelTasks.startParallelTasks();
- } catch (InterruptedException exception) {
- LOGGER.error("Processing was interrupted due to :" + exception.getMessage());
- }
- parallelTasks = new ParallelTasks();
- }
-
- /*String incomingJsonString = "";
- incomingJsonString = dmaapService.consume();
- processReceivedJson(incomingJsonString);*/
- } catch (Exception exception) {
- LOGGER.error("Reported exception : " + exception.getMessage(), exception);
- }
- }
+ /**
+ * method triggers universal VES adapter module.
+ */
+ public void start() throws MapperConfigException {
+ LOGGER.debug("Creating Subcriber and Publisher with creator.............");
+ DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber();
- /**
- * It finds mapping file for received json, transforms json to VES format
- * and publishes it to outgoing DMaap MR Topic
- *
- * @param incomingJsonString
- */
- private void processReceivedJson(String incomingJsonString){
- LOGGER.debug("Received incoming message : " + incomingJsonString);
- if (!"".equals(incomingJsonString)) {
- String eventType;
- try {
- eventType = adapterService.identifyEventTypeFromIncomingJson(incomingJsonString);
-
- LOGGER.debug("Event identified as " + eventType);
- String outgoingJsonString;
- outgoingJsonString = eventAdapter.transform(incomingJsonString, eventType);
- LOGGER.debug("Output VES json to be sent " + outgoingJsonString);
-
-// dmaapService.addMessageInOutgoingQueue(outgoingJsonString);
-// LOGGER.debug("Added message in outgoing Queue ");
-
- dmaapService.publishToDMaap(outgoingJsonString);
- LOGGER.debug("Sent message in outgoing Queue ");
-
-
- } catch (VesException exception) {
- LOGGER.error("Received exception : " + exception.getMessage(), exception);
-
- //TODO KKM : Do we wish to continue the application with same exception in every thread??
- LOGGER.error("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
- ctx.close();
- }
- }
- }
-
- /**
- * method stops universal ves adapter module
- */
- public void stop() {
-
- isRunning = false;
- }
+ DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher();
+
+ // Create subscriber & publisher thread
+ Thread t1 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOGGER.debug("starting subscriber & publisher thread:{}", Thread.currentThread().getName());
+ dmaapService.fetchAndPublishInDMaaP(subcriber, publisher, creator);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // Start subscriber & publisher thread
+ t1.setName("SNMP-COLLECTOR");
+ t1.start();
+
+ }
+
+ /**
+ * method stops universal ves adapter module
+ */
+ public void stop() {
+ isRunning = false;
+ }
}
+