diff options
Diffstat (limited to 'UniversalVesAdapter/src/main/java/org/onap')
11 files changed, 400 insertions, 225 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java index 3a64247..625b021 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileReadException.java @@ -25,7 +25,7 @@ package org.onap.universalvesadapter.exception; * @author kmalbari * */ -public class ConfigFileReadException extends Exception { +public class ConfigFileReadException extends VesException { /** * @@ -35,4 +35,10 @@ public class ConfigFileReadException extends Exception { public ConfigFileReadException(String exceptionMessage) { super(exceptionMessage); } + + public ConfigFileReadException(String exceptionMessage, Exception exception) { + super(exceptionMessage, exception); + } + + } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java index 1daa939..7055bc0 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/ConfigFileSmooksConversionException.java @@ -26,7 +26,7 @@ package org.onap.universalvesadapter.exception; * @author kmalbari * */ -public class ConfigFileSmooksConversionException extends Exception { +public class ConfigFileSmooksConversionException extends VesException { /** * @@ -36,5 +36,9 @@ public class ConfigFileSmooksConversionException extends Exception { public ConfigFileSmooksConversionException(String string) { super(string); } + + public ConfigFileSmooksConversionException(String string, Exception exception) { + super(string, exception); + } } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java index 5af0205..7a35f83 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/DMaapException.java @@ -25,7 +25,7 @@ package org.onap.universalvesadapter.exception; * @author kmalbari * */ -public class DMaapException extends Exception { +public class DMaapException extends VesException { /** * @@ -36,6 +36,9 @@ public class DMaapException extends Exception { super(string); } + public DMaapException(String string, Exception exception) { + super(string, exception); + } } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java index a8414d8..3dfa034 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/MapperConfigException.java @@ -19,15 +19,13 @@ */ package org.onap.universalvesadapter.exception; -import java.io.IOException; - /** * Exception thrown during mapping config operations * * @author kmalbari * */ -public class MapperConfigException extends Exception { +public class MapperConfigException extends VesException { /** * @@ -35,11 +33,11 @@ public class MapperConfigException extends Exception { private static final long serialVersionUID = -7876042513908918292L; public MapperConfigException(String string) { - // TODO Auto-generated constructor stub + super(string); } - - public MapperConfigException(String string, IOException exception) { - // TODO Auto-generated constructor stub + + public MapperConfigException(String string, Exception exception) { + super(string, exception); } } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java index 31134c8..fd11b89 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/exception/VesException.java @@ -37,4 +37,8 @@ public class VesException extends Exception { public VesException(String string) { super(string); } + + public VesException(String string, Exception exception) { + super(string, exception); + } } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java index 14c5a83..1e6006a 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/AdapterService.java @@ -17,16 +17,17 @@ * limitations under the License. * ============LICENSE_END========================================================= */ +package org.onap.universalvesadapter.service; -package org.onap.universalvesadapter.service; - +import org.onap.universalvesadapter.exception.MapperConfigException; +import org.onap.universalvesadapter.utils.MapperConfigUtils; //import org.onap.universalvesadapter.adapter.GenericAdapter; //import org.onap.universalvesadapter.adapter.UniversalEventAdapter; //import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** - * This service is written to identify the different type of events + * This service is written to identify the different type of events * * @author kmalbari * @@ -34,12 +35,11 @@ import org.springframework.stereotype.Component; @Component public class AdapterService { - /*@Autowired - private UniversalEventAdapter snmpTrapEventAdapter; - public GenericAdapter identifyIncomingJsonFormatAndReturnAdapter() { - return snmpTrapEventAdapter; - }*/ - + /* + * @Autowired private UniversalEventAdapter snmpTrapEventAdapter; public + * GenericAdapter identifyIncomingJsonFormatAndReturnAdapter() { return + * snmpTrapEventAdapter; } + */ /** * Identifies eventype by parsing the incoming json file. @@ -47,11 +47,13 @@ public class AdapterService { * @param incomingJsonString * * @return the event type + * @throws MapperConfigException + * if mapper config did not perform correctly */ - public String identifyEventTypeFromIncomingJson(String incomingJsonString) { - - //TODO A proper logic to identify diffeent events is needed here - return "snmp"; + public String identifyEventTypeFromIncomingJson(String incomingJsonString) throws MapperConfigException { + + // TODO A proper logic to identify diffeent events is needed here + return MapperConfigUtils.checkIncomingJsonForMatchingDomain(incomingJsonString); } } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java index 09e2592..bf45a1b 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/ConfigFileService.java @@ -22,20 +22,20 @@ package org.onap.universalvesadapter.service; import org.onap.universalvesadapter.exception.ConfigFileReadException; /** - * A contract defined for services that will handle the operations of config file + * A contract defined for services that will handle the operations of config file. * * @author kmalbari * */ public interface ConfigFileService { - - /** - * Returns the config file data - * - * @param fileName - * @return config file content - * @throws ConfigFileReadException if unable to read config file - */ - String readConfigFile(String fileName) throws ConfigFileReadException; - + + /** + * Returns the config file data. + * + * @param fileName file name + * @return config file content + * @throws ConfigFileReadException if unable to read config file + */ + String readConfigFile(String fileName) throws ConfigFileReadException; + } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java index 0b318eb..e463a28 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java @@ -25,15 +25,23 @@ import java.io.InputStreamReader; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + import org.onap.universalvesadapter.configs.DMaapMrUrlConfiguration; +import org.onap.universalvesadapter.exception.DMaapException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -//import com.att.nsa.mr.client.MRBatchingPublisher; -//import com.att.nsa.mr.client.MRClientFactory; -//import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.MRPublisher.message; /** * @@ -46,12 +54,30 @@ import org.springframework.stereotype.Component; @Component public class DMaapService { + private final Logger eLOGGER = LoggerFactory.getLogger(this.getClass()); + @Autowired private DMaapMrUrlConfiguration dmaapMrUrlObject; - -// private MRConsumer cc; -// -// private MRBatchingPublisher pub; + + private MRConsumer consumer; + + private MRBatchingPublisher publisher; + + private List<String> outgoingMessageQueue = new CopyOnWriteArrayList<>(); + + /** + * Adds message to outgoing queue that will be sent to DMaaP topic. + * + * @param message outbound message in VES format + */ + public void addMessageInOutgoingQueue(String message) { + if (null != message && !"".equals(message)) { + outgoingMessageQueue.add(message); + eLOGGER.debug("Added message to outgoing queue " + message); + } + } + + /** * reads the messages on DMaap MR Topic @@ -60,23 +86,25 @@ public class DMaapService { * * @throws DMaapException */ - /*public Iterable<String> consumeFromDMaap() throws DMaapException{ - if(null == cc){ - try { - cc = MRClientFactory.createConsumer (dmaapMrUrlObject.getConsumerProperties()); - } catch (IOException exception) { - throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage()); - } - - try { - return cc.fetch(); - } catch (Exception exception) { - throw new DMaapException("Problem while fetching messaged from consumer \nReason : " + exception.getMessage()); - } - } - return () -> Collections.emptyIterator(); - - }*/ + public Iterable<String> consumeFromDMaap() throws DMaapException{ + if(null == consumer){ + try { + consumer = MRClientFactory.createConsumer (dmaapMrUrlObject.getConsumerProperties()); + eLOGGER.debug("Created consumer"); + } catch (IOException exception) { + throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage(), exception); + } + } + + try { + eLOGGER.debug("Returning result fetched by consumer"); + return consumer.fetch(); + } catch (Exception exception) { + throw new DMaapException("Problem while fetching messaged from consumer \nReason : " + exception.getMessage(), exception); + } +// return () -> Collections.emptyIterator(); + + } /** @@ -85,71 +113,132 @@ public class DMaapService { * * @throws DMaapException */ - /*public void publishToDMaap() throws DMaapException{ - if(null == cc){ - try { - pub = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties()); - } catch (IOException exception) { - throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage()); - } - - } - + public void publishToDMaap() throws DMaapException{ + if(null == publisher){ + try { + publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties()); + eLOGGER.debug("Create a publisher now."); + } catch (IOException exception) { + throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception); + } + } + for(String message : outgoingMessageQueue){ + try { + publisher.send(message); + eLOGGER.debug("Sending message to DMaaP :-> " + message ); + } catch (IOException exception) { + throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception); + } + } + List<message> stuck = null; + try { + stuck = publisher.close ( 20, TimeUnit.SECONDS ); + } catch (IOException | InterruptedException exception) { + throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception); + } + if (null != stuck) { + if (stuck.size() > 0) { + eLOGGER.debug(stuck.size() + " messages unsent"); + } else { + eLOGGER.debug("Clean exit; all messages sent."); + } + } + else + throw new DMaapException("Problem while closing publisher, no messages were returned. "); + + } + + /** + * sends the messages to DMaap MR Topic + * + * + * @throws DMaapException + */ + public void publishToDMaap(String outgoingMessage) throws DMaapException{ + if(null == publisher){ + synchronized(publisher){ + if(null == publisher){ + try { + publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties()); + eLOGGER.debug("Publisher created now."); + } catch (IOException exception) { + throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception); + } + } + } + } + try { + publisher.send(outgoingMessage); + eLOGGER.debug("Sent outgoing message " + outgoingMessage); + } catch (IOException exception) { + throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception); + } + List<message> stuck = null; + try { + stuck = publisher.close ( 20, TimeUnit.SECONDS ); + } catch (IOException | InterruptedException exception) { + throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception); + } + if (null != stuck) { + if (stuck.size() > 0) { + eLOGGER.debug(stuck.size() + " messages unsent"); + } else { + eLOGGER.debug("Clean exit; all messages sent."); + } + } else{ + throw new DMaapException("Problem while closing publisher, no messages were returned. "); + } + + } + + + /** + * for local testing only + * @return + * @throws DMaapException + */ + /*public String consume() throws DMaapException { + + URL url; + StringBuffer incomingJson = null; + incomingJson = new StringBuffer(); + try { + url = new URL(dmaapMrUrlObject.getUrl()); + + //open the connection to the above URL. + URLConnection urlcon = url.openConnection(); + + Map<String, List<String>> header = urlcon.getHeaderFields(); + + //print all the fields along with their value. + for (Map.Entry<String, List<String>> mp : header.entrySet()) { + eLOGGER.debug(mp.getKey() + " : "); + eLOGGER.debug(mp.getValue().toString()); + } + eLOGGER.debug("Complete source code of the URL is-"); + eLOGGER.debug("---------------------------------"); + + //get the inputstream of the open connection. + BufferedReader br = new BufferedReader(new InputStreamReader(urlcon.getInputStream())); + String tempString; + //print the source code line by line. + while ((tempString = br.readLine()) != null) { + eLOGGER.debug(tempString); + incomingJson.append(tempString); + } + + } catch (MalformedURLException exception) { + throw new DMaapException("Problem consuming from url \nReason : " + exception.getMessage(), exception); + } catch (IOException exception) { + throw new DMaapException("Problem consuming \nReason : " + exception.getMessage(), exception); + } + return incomingJson.toString(); }*/ - /** - * @return - */ - public String consume(){ - URL url; - StringBuffer incomingJson = null; - incomingJson = new StringBuffer(); - try { - url = new URL(dmaapMrUrlObject.getUrl()); - - //open the connection to the above URL. - URLConnection urlcon = url.openConnection(); - - Map<String, List<String>> header = urlcon.getHeaderFields(); - - //print all the fields along with their value. - for (Map.Entry<String, List<String>> mp : header.entrySet()) - { - System.out.print(mp.getKey() + " : "); - System.out.println(mp.getValue().toString()); - } - System.out.println(); - System.out.println("Complete source code of the URL is-"); - System.out.println("---------------------------------"); - - //get the inputstream of the open connection. - BufferedReader br = new BufferedReader(new InputStreamReader - (urlcon.getInputStream())); - String tempString; - //print the source code line by line. - while ((tempString = br.readLine()) != null) - { - System.out.println(tempString); - incomingJson.append(tempString); - } - - } catch (MalformedURLException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return incomingJson.toString(); - } - - - - - - - + + + } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java index c2a30f0..7c05ced 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DiskRepoConfigFileService.java @@ -32,7 +32,7 @@ import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; /** - * Implementation of {@code ConfigFileService} using disk repository + * Implementation of {@code ConfigFileService} using disk repository. * * @author kmalbari * @@ -40,34 +40,48 @@ import org.springframework.web.client.RestTemplate; @Component public class DiskRepoConfigFileService implements ConfigFileService { - private final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); - - @Autowired - private DiskRepoConfiguration diskRepoConfiguration; - - private RestTemplate restTemplate = new RestTemplate(); - - private URI uri = null; + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private DiskRepoConfiguration diskRepoConfiguration; + + private RestTemplate restTemplate; + + private URI uri = null; - /* (non-Javadoc) - * @see org.onap.universalvesadapter.service.ConfigFileService#readConfigFile(java.lang.String) - */ - @Override - public String readConfigFile(String fileName) throws ConfigFileReadException { - LOGGER.debug("Reading config file for " + fileName); - if(null == uri){ - try { - uri = new URI(diskRepoConfiguration.getFileRepositoryUrl()+fileName); - LOGGER.debug("Read URI for " + fileName); - } catch (URISyntaxException exception) { - throw new ConfigFileReadException("Unable to read config file for file " - + fileName + "\n Reason : " + exception.getMessage()); - } - } - LOGGER.debug("Calling file repo service for URI" + uri); - ResponseEntity<String> fileDataEntity = restTemplate.getForEntity(uri, String.class); - LOGGER.debug("Call completed successfully"); - return fileDataEntity.getBody(); - } + /* (non-Javadoc) + * @see org.onap.universalvesadapter.service.ConfigFileService#readConfigFile(java.lang.String) + */ + @Override + public String readConfigFile(String fileName) throws ConfigFileReadException { + logger.debug("Reading config file for " + fileName); + if (null == uri) { + try { + uri = new URI(diskRepoConfiguration.getFileRepositoryUrl() + fileName); + logger.debug("Read URI for " + fileName); + } catch (URISyntaxException exception) { + throw new ConfigFileReadException("Unable to read config file for file " + + fileName + "\n Reason : " + exception.getMessage(), exception); + } + } + logger.debug("Calling file repo service for URI" + uri); + ResponseEntity<String> fileDataEntity = getRestTemplate().getForEntity(uri, String.class); + logger.debug("Call completed successfully"); + return fileDataEntity.getBody(); + } + + /** + * Instantiates the instance if null and returns it. + * + * @return {@code RestTemplate} instance + */ + private RestTemplate getRestTemplate(){ + + if (null == restTemplate) { + restTemplate = new RestTemplate(); + } + + return restTemplate; + } } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java index bab304a..77769f5 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/MongoDbConfigFileService.java @@ -37,8 +37,14 @@ public class MongoDbConfigFileService implements ConfigFileService { public String readConfigFile(String configFileName){ //HERE CONFIG FILE DATA WOULD COME FROM MONGO DB ConfigFileData configFileData = new ConfigFileData(); - configFileData.setXmlFileName(""); - configFileData.setXmlContent("<?xml version=\"1.0\" encoding=\"UTF-8\"?> <smooks-resource-list xmlns=\"http://www.milyn.org/xsd/smooks-1.1.xsd\" xmlns:json=\"http://www.milyn.org/xsd/smooks/json-1.1.xsd\" xmlns:jb=\"http://www.milyn.org/xsd/smooks/javabean-1.2.xsd\"> <json:reader rootName=\"simple\" keyWhitspaceReplacement=\"-\"> </json:reader> <!-- <jb:bean class=\"com.example.demo.Simple\" beanId=\"simple\" createOnElement=\"simple\"> <jb:value property=\"orderId\" data=\"#/orderId\" /> <jb:value property=\"username\" data=\"#/username\" /> <jb:wiring property=\"customer\" beanIdRef=\"customer\"/> <jb:wiring property=\"orderItems\" beanIdRef=\"orderItems\"/> </jb:bean> --> <jb:bean class=\"org.onap.dcaegen2.ves.domain.VesEvent\" beanId=\"vesEvent\" createOnElement=\"simple\"> <jb:wiring property=\"event\" beanIdRef=\"event\"/> </jb:bean> <jb:bean class=\"org.onap.dcaegen2.ves.domain.Event\" beanId=\"event\" createOnElement=\"simple\"> <jb:wiring property=\"commonEventHeader\" beanIdRef=\"commonEventHeader\"/> <jb:wiring property=\"faultFields\" beanIdRef=\"faultFields\"/> <jb:wiring property=\"measurementsForVfScalingFields\" beanIdRef=\"measurementsForVfScalingFields\"/> </jb:bean> <jb:bean class=\"org.onap.dcaegen2.ves.domain.MeasurementsForVfScalingFields\" beanId=\"measurementsForVfScalingFields\" createOnElement=\"simple\"> <jb:wiring property=\"additionalMeasurements\" beanIdRef=\"additionalMeasurements\"/> </jb:bean> <jb:bean class=\"org.onap.dcaegen2.ves.domain.CommonEventHeader\" beanId=\"commonEventHeader\" createOnElement=\"simple\"> <jb:value property=\"eventId\" data=\"#/community\" /> <jb:value property=\"eventName\" data=\"#/protocol-version\" /> <jb:value property=\"domain\" data=\"#/trap-category\" /> <jb:value property=\"sequence\" data=\"#/time-received\" decoder=\"Long\"/> <jb:value property=\"lastEpochMicrosec\" data=\"#/community-len\" decoder=\"Double\" /> <jb:value property=\"startEpochMicrosec\" data=\"#/notify-OID-len\" /> </jb:bean> <jb:bean class=\"org.onap.dcaegen2.ves.domain.FaultFields\" beanId=\"faultFields\" createOnElement=\"simple\"> <jb:value property=\"alarmCondition\" data=\"#/cambria.partition\" /> <jb:value property=\"eventSeverity\" data=\"#/notify-OID\" /> <jb:value property=\"eventSourceType\" data=\"#/agent-name\" /> <jb:value property=\"specificProblem\" data=\"#/agent-address\" /> <jb:value property=\"faultFieldsVersion\" data=\"#/epoch_serno\" decoder=\"Double\" /> </jb:bean> <jb:bean class=\"java.util.ArrayList\" beanId=\"additionalMeasurements\" createOnElement=\"simple\"> <jb:wiring beanIdRef=\"additionalMeasurement\"/> </jb:bean> <jb:bean class=\"org.onap.dcaegen2.ves.domain.AdditionalMeasurement\" beanId=\"additionalMeasurement\" createOnElement=\"varbinds/element\"> <jb:value property=\"name\" data=\"#/varbind_value\" /> </jb:bean> </smooks-resource-list>"); + configFileData.setXmlFileName(configFileName); + configFileData.setXmlContent("<?xml version=\"1.0\" encoding=\"UTF-8\"?> " + + "<smooks-resource-list xmlns=\"http://www.milyn.org/xsd/smooks-1.1.xsd\" " + + "xmlns:json=\"http://www.milyn.org/xsd/smooks/json-1.1.xsd\" " + + " xmlns:jb=\"http://www.milyn.org/xsd/smooks/javabean-1.2.xsd\"> " + + " <json:reader rootName=\"simple\" keyWhitspaceReplacement=\"-\"> " + + " </json:reader> " + + "</smooks-resource-list>"); return configFileData.getXmlContent(); } diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java index 81cb4b8..112d1d6 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/VesService.java @@ -19,16 +19,28 @@ */ package org.onap.universalvesadapter.service; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; + import javax.annotation.Resource; + +import org.milyn.io.FileUtils; import org.onap.universalvesadapter.adapter.GenericAdapter; import org.onap.universalvesadapter.exception.ConfigFileReadException; import org.onap.universalvesadapter.exception.ConfigFileSmooksConversionException; +import org.onap.universalvesadapter.exception.DMaapException; +import org.onap.universalvesadapter.exception.MapperConfigException; import org.onap.universalvesadapter.exception.VesException; +import org.onap.universalvesadapter.utils.MapperConfigUtils; +import org.onap.universalvesadapter.utils.ParallelTasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; +import org.springframework.util.FileCopyUtils; /** * Service that starts the universal ves adapter module to listen for events @@ -38,10 +50,13 @@ import org.springframework.stereotype.Component; */ @Component public class VesService { - - private final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); - - private boolean isRunning = true; + + private final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); + + private boolean isRunning = true; + + @Autowired + private ConfigurableApplicationContext ctx; @Autowired private DMaapService dmaapService; @@ -49,80 +64,114 @@ public class VesService { @Autowired private AdapterService adapterService; - @Resource(name="universalEventAdapter") + @Resource(name = "universalEventAdapter") private GenericAdapter eventAdapter; @Value("${messagesInBatch}") - private int messagesInBatch; - - /*public void start(){ - - String incomingJsonString = dmaapService.consume(); - if(!"".equals(incomingJsonString)){ - GenericAdapter eventAdapter = adapterService.identifyIncomingJsonFormatAndReturnAdapter(); - String outgoingJsonString = eventAdapter.transform(incomingJsonString); - System.out.println(outgoingJsonString); - } - }*/ + private int messagesInBatch; + + @Value("${messagesInTimeInterval}") + private long messagesInTimeInverval; + + @Value("${mapperConfig.file}") + private String mapperConfigFile; + /*public void start(){ + + String incomingJsonString = dmaapService.consume(); + if(!"".equals(incomingJsonString)){ + GenericAdapter eventAdapter = adapterService.identifyIncomingJsonFormatAndReturnAdapter(); + String outgoingJsonString = eventAdapter.transform(incomingJsonString); + System.out.println(outgoingJsonString); + } + }*/ - /** - * method triggers universal ves adapter module - */ - public void start() { - /*ParallelTasks parallelTasks = new ParallelTasks(); - int processingNumberOfMessage = 0; - while (isRunning) { - try { - for(String incomingJsonString : dmaapService.consumeFromDMaap()){ - parallelTasks.add(() -> processReceivedJson(incomingJsonString)); - processingNumberOfMessage++; - if(processingNumberOfMessage == messagesInBatch){ - parallelTasks.startParallelTasks(); - processingNumberOfMessage=0; - parallelTasks = new ParallelTasks(); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) - { - } - } - } - } catch (DMaapException e) { - } - }*/ - String incomingJsonString = dmaapService.consume(); - processReceivedJson(incomingJsonString); - } + + /** + * method triggers universal ves adapter module. + */ + public void start() { + + try { + String mappingConfigFileData = FileCopyUtils.copyToString(new FileReader(mapperConfigFile)); + MapperConfigUtils.readMapperConfigFile(mappingConfigFileData); + + + ParallelTasks parallelTasks = new ParallelTasks(); + while (isRunning) { + int processingNumberOfMessage = 0; + long start = System.currentTimeMillis(); + for (String incomingJsonString : dmaapService.consumeFromDMaap()) { + parallelTasks.add(() -> processReceivedJson(incomingJsonString)); + processingNumberOfMessage++; + if (processingNumberOfMessage == messagesInBatch + || (System.currentTimeMillis() - start) > messagesInTimeInverval) { + processingNumberOfMessage = 0; + start = System.currentTimeMillis(); + try { + parallelTasks.startParallelTasks(); + } catch (InterruptedException exception) { + LOGGER.error("Processing was interrupted due to :" + exception.getMessage()); + } + parallelTasks = new ParallelTasks(); + } + } + try { + parallelTasks.startParallelTasks(); + } catch (InterruptedException exception) { + LOGGER.error("Processing was interrupted due to :" + exception.getMessage()); + } + parallelTasks = new ParallelTasks(); + } + + /*String incomingJsonString = ""; + incomingJsonString = dmaapService.consume(); + processReceivedJson(incomingJsonString);*/ + } catch (Exception exception) { + LOGGER.error("Reported exception : " + exception.getMessage(), exception); + } + } - /** - * It finds mapping file for received json, transforms json to VES format - * and publishes it to outgoing DMaap MR Topic - * - * @param incomingJsonString - */ - private void processReceivedJson(String incomingJsonString) { - try { - LOGGER.debug("Received incoming message" + incomingJsonString); - if (!"".equals(incomingJsonString)) { - String eventType = adapterService.identifyEventTypeFromIncomingJson(incomingJsonString); - LOGGER.debug("Event identified as " + eventType); - String outgoingJsonString; - outgoingJsonString = eventAdapter.transform(incomingJsonString, eventType); - LOGGER.debug("Output VES json to be sent " + outgoingJsonString); - } - } catch (ConfigFileReadException | ConfigFileSmooksConversionException | VesException exception) { - LOGGER.error(exception.getMessage()); - } - } - - /** - * method stops universal ves adapter module - */ - public void stop() { - - isRunning = false; - } + /** + * It finds mapping file for received json, transforms json to VES format + * and publishes it to outgoing DMaap MR Topic + * + * @param incomingJsonString + */ + private void processReceivedJson(String incomingJsonString){ + LOGGER.debug("Received incoming message : " + incomingJsonString); + if (!"".equals(incomingJsonString)) { + String eventType; + try { + eventType = adapterService.identifyEventTypeFromIncomingJson(incomingJsonString); + + LOGGER.debug("Event identified as " + eventType); + String outgoingJsonString; + outgoingJsonString = eventAdapter.transform(incomingJsonString, eventType); + LOGGER.debug("Output VES json to be sent " + outgoingJsonString); + +// dmaapService.addMessageInOutgoingQueue(outgoingJsonString); +// LOGGER.debug("Added message in outgoing Queue "); + + dmaapService.publishToDMaap(outgoingJsonString); + LOGGER.debug("Sent message in outgoing Queue "); + + + } catch (VesException exception) { + LOGGER.error("Received exception : " + exception.getMessage(), exception); + + //TODO KKM : Do we wish to continue the application with same exception in every thread?? + LOGGER.error("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED."); + ctx.close(); + } + } + } + + /** + * method stops universal ves adapter module + */ + public void stop() { + + isRunning = false; + } } |