aboutsummaryrefslogtreecommitdiffstats
path: root/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
diff options
context:
space:
mode:
Diffstat (limited to 'UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java')
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java261
1 files changed, 175 insertions, 86 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
index 0b318eb..e463a28 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
@@ -25,15 +25,23 @@ import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+
import org.onap.universalvesadapter.configs.DMaapMrUrlConfiguration;
+import org.onap.universalvesadapter.exception.DMaapException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-//import com.att.nsa.mr.client.MRBatchingPublisher;
-//import com.att.nsa.mr.client.MRClientFactory;
-//import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.MRPublisher.message;
/**
*
@@ -46,12 +54,30 @@ import org.springframework.stereotype.Component;
@Component
public class DMaapService {
+ private final Logger eLOGGER = LoggerFactory.getLogger(this.getClass());
+
@Autowired
private DMaapMrUrlConfiguration dmaapMrUrlObject;
-
-// private MRConsumer cc;
-//
-// private MRBatchingPublisher pub;
+
+ private MRConsumer consumer;
+
+ private MRBatchingPublisher publisher;
+
+ private List<String> outgoingMessageQueue = new CopyOnWriteArrayList<>();
+
+ /**
+ * Adds message to outgoing queue that will be sent to DMaaP topic.
+ *
+ * @param message outbound message in VES format
+ */
+ public void addMessageInOutgoingQueue(String message) {
+ if (null != message && !"".equals(message)) {
+ outgoingMessageQueue.add(message);
+ eLOGGER.debug("Added message to outgoing queue " + message);
+ }
+ }
+
+
/**
* reads the messages on DMaap MR Topic
@@ -60,23 +86,25 @@ public class DMaapService {
*
* @throws DMaapException
*/
- /*public Iterable<String> consumeFromDMaap() throws DMaapException{
- if(null == cc){
- try {
- cc = MRClientFactory.createConsumer (dmaapMrUrlObject.getConsumerProperties());
- } catch (IOException exception) {
- throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage());
- }
-
- try {
- return cc.fetch();
- } catch (Exception exception) {
- throw new DMaapException("Problem while fetching messaged from consumer \nReason : " + exception.getMessage());
- }
- }
- return () -> Collections.emptyIterator();
-
- }*/
+ public Iterable<String> consumeFromDMaap() throws DMaapException{
+ if(null == consumer){
+ try {
+ consumer = MRClientFactory.createConsumer (dmaapMrUrlObject.getConsumerProperties());
+ eLOGGER.debug("Created consumer");
+ } catch (IOException exception) {
+ throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage(), exception);
+ }
+ }
+
+ try {
+ eLOGGER.debug("Returning result fetched by consumer");
+ return consumer.fetch();
+ } catch (Exception exception) {
+ throw new DMaapException("Problem while fetching messaged from consumer \nReason : " + exception.getMessage(), exception);
+ }
+// return () -> Collections.emptyIterator();
+
+ }
/**
@@ -85,71 +113,132 @@ public class DMaapService {
*
* @throws DMaapException
*/
- /*public void publishToDMaap() throws DMaapException{
- if(null == cc){
- try {
- pub = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties());
- } catch (IOException exception) {
- throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage());
- }
-
- }
-
+ public void publishToDMaap() throws DMaapException{
+ if(null == publisher){
+ try {
+ publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties());
+ eLOGGER.debug("Create a publisher now.");
+ } catch (IOException exception) {
+ throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception);
+ }
+ }
+ for(String message : outgoingMessageQueue){
+ try {
+ publisher.send(message);
+ eLOGGER.debug("Sending message to DMaaP :-> " + message );
+ } catch (IOException exception) {
+ throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception);
+ }
+ }
+ List<message> stuck = null;
+ try {
+ stuck = publisher.close ( 20, TimeUnit.SECONDS );
+ } catch (IOException | InterruptedException exception) {
+ throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception);
+ }
+ if (null != stuck) {
+ if (stuck.size() > 0) {
+ eLOGGER.debug(stuck.size() + " messages unsent");
+ } else {
+ eLOGGER.debug("Clean exit; all messages sent.");
+ }
+ }
+ else
+ throw new DMaapException("Problem while closing publisher, no messages were returned. ");
+
+ }
+
+ /**
+ * sends the messages to DMaap MR Topic
+ *
+ *
+ * @throws DMaapException
+ */
+ public void publishToDMaap(String outgoingMessage) throws DMaapException{
+ if(null == publisher){
+ synchronized(publisher){
+ if(null == publisher){
+ try {
+ publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties());
+ eLOGGER.debug("Publisher created now.");
+ } catch (IOException exception) {
+ throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception);
+ }
+ }
+ }
+ }
+ try {
+ publisher.send(outgoingMessage);
+ eLOGGER.debug("Sent outgoing message " + outgoingMessage);
+ } catch (IOException exception) {
+ throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception);
+ }
+ List<message> stuck = null;
+ try {
+ stuck = publisher.close ( 20, TimeUnit.SECONDS );
+ } catch (IOException | InterruptedException exception) {
+ throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception);
+ }
+ if (null != stuck) {
+ if (stuck.size() > 0) {
+ eLOGGER.debug(stuck.size() + " messages unsent");
+ } else {
+ eLOGGER.debug("Clean exit; all messages sent.");
+ }
+ } else{
+ throw new DMaapException("Problem while closing publisher, no messages were returned. ");
+ }
+
+ }
+
+
+ /**
+ * for local testing only
+ * @return
+ * @throws DMaapException
+ */
+ /*public String consume() throws DMaapException {
+
+ URL url;
+ StringBuffer incomingJson = null;
+ incomingJson = new StringBuffer();
+ try {
+ url = new URL(dmaapMrUrlObject.getUrl());
+
+ //open the connection to the above URL.
+ URLConnection urlcon = url.openConnection();
+
+ Map<String, List<String>> header = urlcon.getHeaderFields();
+
+ //print all the fields along with their value.
+ for (Map.Entry<String, List<String>> mp : header.entrySet()) {
+ eLOGGER.debug(mp.getKey() + " : ");
+ eLOGGER.debug(mp.getValue().toString());
+ }
+ eLOGGER.debug("Complete source code of the URL is-");
+ eLOGGER.debug("---------------------------------");
+
+ //get the inputstream of the open connection.
+ BufferedReader br = new BufferedReader(new InputStreamReader(urlcon.getInputStream()));
+ String tempString;
+ //print the source code line by line.
+ while ((tempString = br.readLine()) != null) {
+ eLOGGER.debug(tempString);
+ incomingJson.append(tempString);
+ }
+
+ } catch (MalformedURLException exception) {
+ throw new DMaapException("Problem consuming from url \nReason : " + exception.getMessage(), exception);
+ } catch (IOException exception) {
+ throw new DMaapException("Problem consuming \nReason : " + exception.getMessage(), exception);
+ }
+ return incomingJson.toString();
}*/
- /**
- * @return
- */
- public String consume(){
- URL url;
- StringBuffer incomingJson = null;
- incomingJson = new StringBuffer();
- try {
- url = new URL(dmaapMrUrlObject.getUrl());
-
- //open the connection to the above URL.
- URLConnection urlcon = url.openConnection();
-
- Map<String, List<String>> header = urlcon.getHeaderFields();
-
- //print all the fields along with their value.
- for (Map.Entry<String, List<String>> mp : header.entrySet())
- {
- System.out.print(mp.getKey() + " : ");
- System.out.println(mp.getValue().toString());
- }
- System.out.println();
- System.out.println("Complete source code of the URL is-");
- System.out.println("---------------------------------");
-
- //get the inputstream of the open connection.
- BufferedReader br = new BufferedReader(new InputStreamReader
- (urlcon.getInputStream()));
- String tempString;
- //print the source code line by line.
- while ((tempString = br.readLine()) != null)
- {
- System.out.println(tempString);
- incomingJson.append(tempString);
- }
-
- } catch (MalformedURLException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return incomingJson.toString();
- }
-
-
-
-
-
-
-
+
+
+
}