diff options
Diffstat (limited to 'UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java')
-rw-r--r-- | UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java | 197 |
1 files changed, 123 insertions, 74 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java index 81cb4b8..112d1d6 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java @@ -19,16 +19,28 @@ */ package org.onap.universalvesadapter.service; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; + import javax.annotation.Resource; + +import org.milyn.io.FileUtils; import org.onap.universalvesadapter.adapter.GenericAdapter; import org.onap.universalvesadapter.exception.ConfigFileReadException; import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException; +import org.onap.universalvesadapter.exception.DMaapException; +import org.onap.universalvesadapter.exception.MapperConfigException; import org.onap.universalvesadapter.exception.VesException; +import org.onap.universalvesadapter.utils.MapperConfigUtils; +import org.onap.universalvesadapter.utils.ParallelTasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; +import org.springframework.util.FileCopyUtils; /** * Service that starts the universal ves adapter module to listen for events @@ -38,10 +50,13 @@ import org.springframework.stereotype.Component; */ @Component public class VesService { - - private final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); - - private boolean isRunning = true; + + private final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); + + private boolean isRunning = true; + + @Autowired + private ConfigurableApplicationContext ctx; @Autowired private DMaapService dmaapService; @@ -49,80 +64,114 @@ public class VesService { @Autowired private AdapterService adapterService; - @Resource(name="universalEventAdapter") + @Resource(name = "universalEventAdapter") private GenericAdapter eventAdapter; @Value("${messagesInBatch}") - private int messagesInBatch; - - /*public void start(){ - - String incomingJsonString = dmaapService.consume(); - if(!"".equals(incomingJsonString)){ - GenericAdapter eventAdapter = adapterService.identifyIncomingJsonFormatAndReturnAdapter(); - String outgoingJsonString = eventAdapter.transform(incomingJsonString); - System.out.println(outgoingJsonString); - } - }*/ + private int messagesInBatch; + + @Value("${messagesInTimeInterval}") + private long messagesInTimeInverval; + + @Value("${mapperConfig.file}") + private String mapperConfigFile; + /*public void start(){ + + String incomingJsonString = dmaapService.consume(); + if(!"".equals(incomingJsonString)){ + GenericAdapter eventAdapter = adapterService.identifyIncomingJsonFormatAndReturnAdapter(); + String outgoingJsonString = eventAdapter.transform(incomingJsonString); + System.out.println(outgoingJsonString); + } + }*/ - /** - * method triggers universal ves adapter module - */ - public void start() { - /*ParallelTasks parallelTasks = new ParallelTasks(); - int processingNumberOfMessage = 0; - while (isRunning) { - try { - for(String incomingJsonString : dmaapService.consumeFromDMaap()){ - parallelTasks.add(() -> processReceivedJson(incomingJsonString)); - processingNumberOfMessage++; - if(processingNumberOfMessage == messagesInBatch){ - parallelTasks.startParallelTasks(); - processingNumberOfMessage=0; - parallelTasks = new ParallelTasks(); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) - { - } - } - } - } catch (DMaapException e) { - } - }*/ - String incomingJsonString = dmaapService.consume(); - processReceivedJson(incomingJsonString); - } + + /** + * method triggers universal ves adapter module. + */ + public void start() { + + try { + String mappingConfigFileData = FileCopyUtils.copyToString(new FileReader(mapperConfigFile)); + MapperConfigUtils.readMapperConfigFile(mappingConfigFileData); + + + ParallelTasks parallelTasks = new ParallelTasks(); + while (isRunning) { + int processingNumberOfMessage = 0; + long start = System.currentTimeMillis(); + for (String incomingJsonString : dmaapService.consumeFromDMaap()) { + parallelTasks.add(() -> processReceivedJson(incomingJsonString)); + processingNumberOfMessage++; + if (processingNumberOfMessage == messagesInBatch + || (System.currentTimeMillis() - start) > messagesInTimeInverval) { + processingNumberOfMessage = 0; + start = System.currentTimeMillis(); + try { + parallelTasks.startParallelTasks(); + } catch (InterruptedException exception) { + LOGGER.error("Processing was interrupted due to :" + exception.getMessage()); + } + parallelTasks = new ParallelTasks(); + } + } + try { + parallelTasks.startParallelTasks(); + } catch (InterruptedException exception) { + LOGGER.error("Processing was interrupted due to :" + exception.getMessage()); + } + parallelTasks = new ParallelTasks(); + } + + /*String incomingJsonString = ""; + incomingJsonString = dmaapService.consume(); + processReceivedJson(incomingJsonString);*/ + } catch (Exception exception) { + LOGGER.error("Reported exception : " + exception.getMessage(), exception); + } + } - /** - * It finds mapping file for received json, transforms json to VES format - * and publishes it to outgoing DMaap MR Topic - * - * @param incomingJsonString - */ - private void processReceivedJson(String incomingJsonString) { - try { - LOGGER.debug("Received incoming message" + incomingJsonString); - if (!"".equals(incomingJsonString)) { - String eventType = adapterService.identifyEventTypeFromIncomingJson(incomingJsonString); - LOGGER.debug("Event identified as " + eventType); - String outgoingJsonString; - outgoingJsonString = eventAdapter.transform(incomingJsonString, eventType); - LOGGER.debug("Output VES json to be sent " + outgoingJsonString); - } - } catch (ConfigFileReadException | ConfigFileSmooksConversionException | VesException exception) { - LOGGER.error(exception.getMessage()); - } - } - - /** - * method stops universal ves adapter module - */ - public void stop() { - - isRunning = false; - } + /** + * It finds mapping file for received json, transforms json to VES format + * and publishes it to outgoing DMaap MR Topic + * + * @param incomingJsonString + */ + private void processReceivedJson(String incomingJsonString){ + LOGGER.debug("Received incoming message : " + incomingJsonString); + if (!"".equals(incomingJsonString)) { + String eventType; + try { + eventType = adapterService.identifyEventTypeFromIncomingJson(incomingJsonString); + + LOGGER.debug("Event identified as " + eventType); + String outgoingJsonString; + outgoingJsonString = eventAdapter.transform(incomingJsonString, eventType); + LOGGER.debug("Output VES json to be sent " + outgoingJsonString); + +// dmaapService.addMessageInOutgoingQueue(outgoingJsonString); +// LOGGER.debug("Added message in outgoing Queue "); + + dmaapService.publishToDMaap(outgoingJsonString); + LOGGER.debug("Sent message in outgoing Queue "); + + + } catch (VesException exception) { + LOGGER.error("Received exception : " + exception.getMessage(), exception); + + //TODO KKM : Do we wish to continue the application with same exception in every thread?? + LOGGER.error("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED."); + ctx.close(); + } + } + } + + /** + * method stops universal ves adapter module + */ + public void stop() { + + isRunning = false; + } } |