diff options
Diffstat (limited to 'UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java')
-rw-r--r-- | UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java | 290 |
1 files changed, 80 insertions, 210 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java index e463a28..04f333e 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java @@ -19,226 +19,96 @@ */ package org.onap.universalvesadapter.service; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLConnection; -import java.util.Collections; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; -import org.onap.universalvesadapter.configs.DMaapMrUrlConfiguration; +import org.onap.universalvesadapter.adapter.UniversalEventAdapter; +import org.onap.universalvesadapter.dmaap.Creator; +import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher; +import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber; import org.onap.universalvesadapter.exception.DMaapException; +import org.onap.universalvesadapter.exception.VesException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import com.att.nsa.mr.client.MRBatchingPublisher; -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.MRConsumer; -import com.att.nsa.mr.client.MRPublisher.message; - -/** - * - * This service will handle all the communication with the DMaap MR API - * - * - * @author kmalbari - * - */ @Component public class DMaapService { - private final Logger eLOGGER = LoggerFactory.getLogger(this.getClass()); - - @Autowired - private DMaapMrUrlConfiguration dmaapMrUrlObject; - - private MRConsumer consumer; + private final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); + private static List<String> list = new LinkedList<String>(); + @Autowired + private UniversalEventAdapter eventAdapter; + + /** + * It fetches events from DMaap in JSON, transforms JSON to VES format and + * publishes it to outgoing DMaap MR Topic + * + * @param DMaaPMRSubscriber,DMaaPMRPublisher + * @return + */ + public void fetchAndPublishInDMaaP(DMaaPMRSubscriber dMaaPMRSubscriber, DMaaPMRPublisher publisher, Creator creater) + throws InterruptedException { + LOGGER.info("fetch and publish from and to Dmaap started"); + + while (true) { + synchronized (this) { + for (String incomingJsonString : dMaaPMRSubscriber.fetchMessages().getFetchedMessages()) { + list.add(incomingJsonString); + + } + + if (list.size() == 0) { + Thread.sleep(2000); + } + LOGGER.debug("number of messages to be converted :{}", list.size()); + + if (list.size() != 0) { + String val = ((LinkedList<String>) list).removeFirst(); + List<String> messages = new ArrayList<>(); + String vesEvent = processReceivedJson(val); + if (!(vesEvent.isEmpty() || vesEvent.equals(null) || vesEvent.equals(""))) { + messages.add(vesEvent); + publisher.publish(messages); + LOGGER.info("Message successfully published to DMaaP Topic"); + } + + } + + } + } + } + + /** + * It finds mapping file for received json, transforms json to VES format + * + * @param incomingJsonString + * @return + */ + private String processReceivedJson(String incomingJsonString) { + String outgoingJsonString = null; + if (!"".equals(incomingJsonString)) { + + try { + /* + * For Future events String eventType = + * adapterService.identifyEventTypeFromIncomingJson(incomingJsonString); + * + * LOGGER.debug("Event identified as " + eventType); + */ + + outgoingJsonString = eventAdapter.transform(incomingJsonString, "snmp"); + + } catch (VesException exception) { + LOGGER.error("Received exception : " + exception.getMessage(), exception); + LOGGER.error("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED."); + } catch (DMaapException e) { + LOGGER.error("Received exception : ", e.getMessage()); + } + } + return outgoingJsonString; + } - private MRBatchingPublisher publisher; - - private List<String> outgoingMessageQueue = new CopyOnWriteArrayList<>(); - - /** - * Adds message to outgoing queue that will be sent to DMaaP topic. - * - * @param message outbound message in VES format - */ - public void addMessageInOutgoingQueue(String message) { - if (null != message && !"".equals(message)) { - outgoingMessageQueue.add(message); - eLOGGER.debug("Added message to outgoing queue " + message); - } - } - - - - /** - * reads the messages on DMaap MR Topic - * - * @return iterable of messages that will be received on DMaap MR Topic - * - * @throws DMaapException - */ - public Iterable<String> consumeFromDMaap() throws DMaapException{ - if(null == consumer){ - try { - consumer = MRClientFactory.createConsumer (dmaapMrUrlObject.getConsumerProperties()); - eLOGGER.debug("Created consumer"); - } catch (IOException exception) { - throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage(), exception); - } - } - - try { - eLOGGER.debug("Returning result fetched by consumer"); - return consumer.fetch(); - } catch (Exception exception) { - throw new DMaapException("Problem while fetching messaged from consumer \nReason : " + exception.getMessage(), exception); - } -// return () -> Collections.emptyIterator(); - - } - - - /** - * sends the messages to DMaap MR Topic - * - * - * @throws DMaapException - */ - public void publishToDMaap() throws DMaapException{ - if(null == publisher){ - try { - publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties()); - eLOGGER.debug("Create a publisher now."); - } catch (IOException exception) { - throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception); - } - } - for(String message : outgoingMessageQueue){ - try { - publisher.send(message); - eLOGGER.debug("Sending message to DMaaP :-> " + message ); - } catch (IOException exception) { - throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception); - } - } - List<message> stuck = null; - try { - stuck = publisher.close ( 20, TimeUnit.SECONDS ); - } catch (IOException | InterruptedException exception) { - throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception); - } - if (null != stuck) { - if (stuck.size() > 0) { - eLOGGER.debug(stuck.size() + " messages unsent"); - } else { - eLOGGER.debug("Clean exit; all messages sent."); - } - } - else - throw new DMaapException("Problem while closing publisher, no messages were returned. "); - - } - - /** - * sends the messages to DMaap MR Topic - * - * - * @throws DMaapException - */ - public void publishToDMaap(String outgoingMessage) throws DMaapException{ - if(null == publisher){ - synchronized(publisher){ - if(null == publisher){ - try { - publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties()); - eLOGGER.debug("Publisher created now."); - } catch (IOException exception) { - throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception); - } - } - } - } - try { - publisher.send(outgoingMessage); - eLOGGER.debug("Sent outgoing message " + outgoingMessage); - } catch (IOException exception) { - throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception); - } - List<message> stuck = null; - try { - stuck = publisher.close ( 20, TimeUnit.SECONDS ); - } catch (IOException | InterruptedException exception) { - throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception); - } - if (null != stuck) { - if (stuck.size() > 0) { - eLOGGER.debug(stuck.size() + " messages unsent"); - } else { - eLOGGER.debug("Clean exit; all messages sent."); - } - } else{ - throw new DMaapException("Problem while closing publisher, no messages were returned. "); - } - - } - - - /** - * for local testing only - * @return - * @throws DMaapException - */ - /*public String consume() throws DMaapException { - - URL url; - StringBuffer incomingJson = null; - incomingJson = new StringBuffer(); - try { - url = new URL(dmaapMrUrlObject.getUrl()); - - //open the connection to the above URL. - URLConnection urlcon = url.openConnection(); - - Map<String, List<String>> header = urlcon.getHeaderFields(); - - //print all the fields along with their value. - for (Map.Entry<String, List<String>> mp : header.entrySet()) { - eLOGGER.debug(mp.getKey() + " : "); - eLOGGER.debug(mp.getValue().toString()); - } - eLOGGER.debug("Complete source code of the URL is-"); - eLOGGER.debug("---------------------------------"); - - //get the inputstream of the open connection. - BufferedReader br = new BufferedReader(new InputStreamReader(urlcon.getInputStream())); - String tempString; - //print the source code line by line. - while ((tempString = br.readLine()) != null) { - eLOGGER.debug(tempString); - incomingJson.append(tempString); - } - - } catch (MalformedURLException exception) { - throw new DMaapException("Problem consuming from url \nReason : " + exception.getMessage(), exception); - } catch (IOException exception) { - throw new DMaapException("Problem consuming \nReason : " + exception.getMessage(), exception); - } - return incomingJson.toString(); - }*/ - - - - - - - } |