diff options
Diffstat (limited to 'UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java')
-rw-r--r-- | UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java | 261 |
1 files changed, 175 insertions, 86 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java index 0b318eb..e463a28 100644 --- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java +++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java @@ -25,15 +25,23 @@ import java.io.InputStreamReader; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + import org.onap.universalvesadapter.configs.DMaapMrUrlConfiguration; +import org.onap.universalvesadapter.exception.DMaapException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -//import com.att.nsa.mr.client.MRBatchingPublisher; -//import com.att.nsa.mr.client.MRClientFactory; -//import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.MRPublisher.message; /** * @@ -46,12 +54,30 @@ import org.springframework.stereotype.Component; @Component public class DMaapService { + private final Logger eLOGGER = LoggerFactory.getLogger(this.getClass()); + @Autowired private DMaapMrUrlConfiguration dmaapMrUrlObject; - -// private MRConsumer cc; -// -// private MRBatchingPublisher pub; + + private MRConsumer consumer; + + private MRBatchingPublisher publisher; + + private List<String> outgoingMessageQueue = new CopyOnWriteArrayList<>(); + + /** + * Adds message to outgoing queue that will be sent to DMaaP topic. + * + * @param message outbound message in VES format + */ + public void addMessageInOutgoingQueue(String message) { + if (null != message && !"".equals(message)) { + outgoingMessageQueue.add(message); + eLOGGER.debug("Added message to outgoing queue " + message); + } + } + + /** * reads the messages on DMaap MR Topic @@ -60,23 +86,25 @@ public class DMaapService { * * @throws DMaapException */ - /*public Iterable<String> consumeFromDMaap() throws DMaapException{ - if(null == cc){ - try { - cc = MRClientFactory.createConsumer (dmaapMrUrlObject.getConsumerProperties()); - } catch (IOException exception) { - throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage()); - } - - try { - return cc.fetch(); - } catch (Exception exception) { - throw new DMaapException("Problem while fetching messaged from consumer \nReason : " + exception.getMessage()); - } - } - return () -> Collections.emptyIterator(); - - }*/ + public Iterable<String> consumeFromDMaap() throws DMaapException{ + if(null == consumer){ + try { + consumer = MRClientFactory.createConsumer (dmaapMrUrlObject.getConsumerProperties()); + eLOGGER.debug("Created consumer"); + } catch (IOException exception) { + throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage(), exception); + } + } + + try { + eLOGGER.debug("Returning result fetched by consumer"); + return consumer.fetch(); + } catch (Exception exception) { + throw new DMaapException("Problem while fetching messaged from consumer \nReason : " + exception.getMessage(), exception); + } +// return () -> Collections.emptyIterator(); + + } /** @@ -85,71 +113,132 @@ public class DMaapService { * * @throws DMaapException */ - /*public void publishToDMaap() throws DMaapException{ - if(null == cc){ - try { - pub = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties()); - } catch (IOException exception) { - throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage()); - } - - } - + public void publishToDMaap() throws DMaapException{ + if(null == publisher){ + try { + publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties()); + eLOGGER.debug("Create a publisher now."); + } catch (IOException exception) { + throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception); + } + } + for(String message : outgoingMessageQueue){ + try { + publisher.send(message); + eLOGGER.debug("Sending message to DMaaP :-> " + message ); + } catch (IOException exception) { + throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception); + } + } + List<message> stuck = null; + try { + stuck = publisher.close ( 20, TimeUnit.SECONDS ); + } catch (IOException | InterruptedException exception) { + throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception); + } + if (null != stuck) { + if (stuck.size() > 0) { + eLOGGER.debug(stuck.size() + " messages unsent"); + } else { + eLOGGER.debug("Clean exit; all messages sent."); + } + } + else + throw new DMaapException("Problem while closing publisher, no messages were returned. "); + + } + + /** + * sends the messages to DMaap MR Topic + * + * + * @throws DMaapException + */ + public void publishToDMaap(String outgoingMessage) throws DMaapException{ + if(null == publisher){ + synchronized(publisher){ + if(null == publisher){ + try { + publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties()); + eLOGGER.debug("Publisher created now."); + } catch (IOException exception) { + throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception); + } + } + } + } + try { + publisher.send(outgoingMessage); + eLOGGER.debug("Sent outgoing message " + outgoingMessage); + } catch (IOException exception) { + throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception); + } + List<message> stuck = null; + try { + stuck = publisher.close ( 20, TimeUnit.SECONDS ); + } catch (IOException | InterruptedException exception) { + throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception); + } + if (null != stuck) { + if (stuck.size() > 0) { + eLOGGER.debug(stuck.size() + " messages unsent"); + } else { + eLOGGER.debug("Clean exit; all messages sent."); + } + } else{ + throw new DMaapException("Problem while closing publisher, no messages were returned. "); + } + + } + + + /** + * for local testing only + * @return + * @throws DMaapException + */ + /*public String consume() throws DMaapException { + + URL url; + StringBuffer incomingJson = null; + incomingJson = new StringBuffer(); + try { + url = new URL(dmaapMrUrlObject.getUrl()); + + //open the connection to the above URL. + URLConnection urlcon = url.openConnection(); + + Map<String, List<String>> header = urlcon.getHeaderFields(); + + //print all the fields along with their value. + for (Map.Entry<String, List<String>> mp : header.entrySet()) { + eLOGGER.debug(mp.getKey() + " : "); + eLOGGER.debug(mp.getValue().toString()); + } + eLOGGER.debug("Complete source code of the URL is-"); + eLOGGER.debug("---------------------------------"); + + //get the inputstream of the open connection. + BufferedReader br = new BufferedReader(new InputStreamReader(urlcon.getInputStream())); + String tempString; + //print the source code line by line. + while ((tempString = br.readLine()) != null) { + eLOGGER.debug(tempString); + incomingJson.append(tempString); + } + + } catch (MalformedURLException exception) { + throw new DMaapException("Problem consuming from url \nReason : " + exception.getMessage(), exception); + } catch (IOException exception) { + throw new DMaapException("Problem consuming \nReason : " + exception.getMessage(), exception); + } + return incomingJson.toString(); }*/ - /** - * @return - */ - public String consume(){ - URL url; - StringBuffer incomingJson = null; - incomingJson = new StringBuffer(); - try { - url = new URL(dmaapMrUrlObject.getUrl()); - - //open the connection to the above URL. - URLConnection urlcon = url.openConnection(); - - Map<String, List<String>> header = urlcon.getHeaderFields(); - - //print all the fields along with their value. - for (Map.Entry<String, List<String>> mp : header.entrySet()) - { - System.out.print(mp.getKey() + " : "); - System.out.println(mp.getValue().toString()); - } - System.out.println(); - System.out.println("Complete source code of the URL is-"); - System.out.println("---------------------------------"); - - //get the inputstream of the open connection. - BufferedReader br = new BufferedReader(new InputStreamReader - (urlcon.getInputStream())); - String tempString; - //print the source code line by line. - while ((tempString = br.readLine()) != null) - { - System.out.println(tempString); - incomingJson.append(tempString); - } - - } catch (MalformedURLException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return incomingJson.toString(); - } - - - - - - - + + + } |