aboutsummaryrefslogtreecommitdiffstats
path: root/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
diff options
context:
space:
mode:
Diffstat (limited to 'UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java')
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java197
1 files changed, 123 insertions, 74 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
index 81cb4b8..112d1d6 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java
@@ -19,16 +19,28 @@
*/
package org.onap.universalvesadapter.service;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+
import javax.annotation.Resource;
+
+import org.milyn.io.FileUtils;
import org.onap.universalvesadapter.adapter.GenericAdapter;
import org.onap.universalvesadapter.exception.ConfigFileReadException;
import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException;
+import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.exception.MapperConfigException;
import org.onap.universalvesadapter.exception.VesException;
+import org.onap.universalvesadapter.utils.MapperConfigUtils;
+import org.onap.universalvesadapter.utils.ParallelTasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
+import org.springframework.util.FileCopyUtils;
/**
* Service that starts the universal ves adapter module to listen for events
@@ -38,10 +50,13 @@ import org.springframework.stereotype.Component;
*/
@Component
public class VesService {
-
- private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
-
- private boolean isRunning = true;
+
+ private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
+
+ private boolean isRunning = true;
+
+ @Autowired
+ private ConfigurableApplicationContext ctx;
@Autowired
private DMaapService dmaapService;
@@ -49,80 +64,114 @@ public class VesService {
@Autowired
private AdapterService adapterService;
- @Resource(name="universalEventAdapter")
+ @Resource(name = "universalEventAdapter")
private GenericAdapter eventAdapter;
@Value("${messagesInBatch}")
- private int messagesInBatch;
-
- /*public void start(){
-
- String incomingJsonString = dmaapService.consume();
- if(!"".equals(incomingJsonString)){
- GenericAdapter eventAdapter = adapterService.identifyIncomingJsonFormatAndReturnAdapter();
- String outgoingJsonString = eventAdapter.transform(incomingJsonString);
- System.out.println(outgoingJsonString);
- }
- }*/
+ private int messagesInBatch;
+
+ @Value("${messagesInTimeInterval}")
+ private long messagesInTimeInverval;
+
+ @Value("${mapperConfig.file}")
+ private String mapperConfigFile;
+ /*public void start(){
+
+ String incomingJsonString = dmaapService.consume();
+ if(!"".equals(incomingJsonString)){
+ GenericAdapter eventAdapter = adapterService.identifyIncomingJsonFormatAndReturnAdapter();
+ String outgoingJsonString = eventAdapter.transform(incomingJsonString);
+ System.out.println(outgoingJsonString);
+ }
+ }*/
- /**
- * method triggers universal ves adapter module
- */
- public void start() {
- /*ParallelTasks parallelTasks = new ParallelTasks();
- int processingNumberOfMessage = 0;
- while (isRunning) {
- try {
- for(String incomingJsonString : dmaapService.consumeFromDMaap()){
- parallelTasks.add(() -> processReceivedJson(incomingJsonString));
- processingNumberOfMessage++;
- if(processingNumberOfMessage == messagesInBatch){
- parallelTasks.startParallelTasks();
- processingNumberOfMessage=0;
- parallelTasks = new ParallelTasks();
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- } catch (DMaapException e) {
- }
- }*/
- String incomingJsonString = dmaapService.consume();
- processReceivedJson(incomingJsonString);
- }
+
+ /**
+ * method triggers universal ves adapter module.
+ */
+ public void start() {
+
+ try {
+ String mappingConfigFileData = FileCopyUtils.copyToString(new FileReader(mapperConfigFile));
+ MapperConfigUtils.readMapperConfigFile(mappingConfigFileData);
+
+
+ ParallelTasks parallelTasks = new ParallelTasks();
+ while (isRunning) {
+ int processingNumberOfMessage = 0;
+ long start = System.currentTimeMillis();
+ for (String incomingJsonString : dmaapService.consumeFromDMaap()) {
+ parallelTasks.add(() -> processReceivedJson(incomingJsonString));
+ processingNumberOfMessage++;
+ if (processingNumberOfMessage == messagesInBatch
+ || (System.currentTimeMillis() - start) > messagesInTimeInverval) {
+ processingNumberOfMessage = 0;
+ start = System.currentTimeMillis();
+ try {
+ parallelTasks.startParallelTasks();
+ } catch (InterruptedException exception) {
+ LOGGER.error("Processing was interrupted due to :" + exception.getMessage());
+ }
+ parallelTasks = new ParallelTasks();
+ }
+ }
+ try {
+ parallelTasks.startParallelTasks();
+ } catch (InterruptedException exception) {
+ LOGGER.error("Processing was interrupted due to :" + exception.getMessage());
+ }
+ parallelTasks = new ParallelTasks();
+ }
+
+ /*String incomingJsonString = "";
+ incomingJsonString = dmaapService.consume();
+ processReceivedJson(incomingJsonString);*/
+ } catch (Exception exception) {
+ LOGGER.error("Reported exception : " + exception.getMessage(), exception);
+ }
+ }
- /**
- * It finds mapping file for received json, transforms json to VES format
- * and publishes it to outgoing DMaap MR Topic
- *
- * @param incomingJsonString
- */
- private void processReceivedJson(String incomingJsonString) {
- try {
- LOGGER.debug("Received incoming message" + incomingJsonString);
- if (!"".equals(incomingJsonString)) {
- String eventType = adapterService.identifyEventTypeFromIncomingJson(incomingJsonString);
- LOGGER.debug("Event identified as " + eventType);
- String outgoingJsonString;
- outgoingJsonString = eventAdapter.transform(incomingJsonString, eventType);
- LOGGER.debug("Output VES json to be sent " + outgoingJsonString);
- }
- } catch (ConfigFileReadException | ConfigFileSmooksConversionException | VesException exception) {
- LOGGER.error(exception.getMessage());
- }
- }
-
- /**
- * method stops universal ves adapter module
- */
- public void stop() {
-
- isRunning = false;
- }
+ /**
+ * It finds mapping file for received json, transforms json to VES format
+ * and publishes it to outgoing DMaap MR Topic
+ *
+ * @param incomingJsonString
+ */
+ private void processReceivedJson(String incomingJsonString){
+ LOGGER.debug("Received incoming message : " + incomingJsonString);
+ if (!"".equals(incomingJsonString)) {
+ String eventType;
+ try {
+ eventType = adapterService.identifyEventTypeFromIncomingJson(incomingJsonString);
+
+ LOGGER.debug("Event identified as " + eventType);
+ String outgoingJsonString;
+ outgoingJsonString = eventAdapter.transform(incomingJsonString, eventType);
+ LOGGER.debug("Output VES json to be sent " + outgoingJsonString);
+
+// dmaapService.addMessageInOutgoingQueue(outgoingJsonString);
+// LOGGER.debug("Added message in outgoing Queue ");
+
+ dmaapService.publishToDMaap(outgoingJsonString);
+ LOGGER.debug("Sent message in outgoing Queue ");
+
+
+ } catch (VesException exception) {
+ LOGGER.error("Received exception : " + exception.getMessage(), exception);
+
+ //TODO KKM : Do we wish to continue the application with same exception in every thread??
+ LOGGER.error("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
+ ctx.close();
+ }
+ }
+ }
+
+ /**
+ * method stops universal ves adapter module
+ */
+ public void stop() {
+
+ isRunning = false;
+ }
}