aboutsummaryrefslogtreecommitdiffstats
path: root/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
diff options
context:
space:
mode:
Diffstat (limited to 'UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java')
-rw-r--r--UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java290
1 files changed, 80 insertions, 210 deletions
diff --git a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
index e463a28..04f333e 100644
--- a/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
+++ b/UniversalVesAdapter/src/main/java/org/onap/universalvesadapter/service/DMaapService.java
@@ -19,226 +19,96 @@
*/
package org.onap.universalvesadapter.service;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLConnection;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import org.onap.universalvesadapter.configs.DMaapMrUrlConfiguration;
+import org.onap.universalvesadapter.adapter.UniversalEventAdapter;
+import org.onap.universalvesadapter.dmaap.Creator;
+import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
+import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
import org.onap.universalvesadapter.exception.DMaapException;
+import org.onap.universalvesadapter.exception.VesException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.MRPublisher.message;
-
-/**
- *
- * This service will handle all the communication with the DMaap MR API
- *
- *
- * @author kmalbari
- *
- */
@Component
public class DMaapService {
- private final Logger eLOGGER = LoggerFactory.getLogger(this.getClass());
-
- @Autowired
- private DMaapMrUrlConfiguration dmaapMrUrlObject;
-
- private MRConsumer consumer;
+ private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
+ private static List<String> list = new LinkedList<String>();
+ @Autowired
+ private UniversalEventAdapter eventAdapter;
+
+ /**
+ * It fetches events from DMaap in JSON, transforms JSON to VES format and
+ * publishes it to outgoing DMaap MR Topic
+ *
+ * @param DMaaPMRSubscriber,DMaaPMRPublisher
+ * @return
+ */
+ public void fetchAndPublishInDMaaP(DMaaPMRSubscriber dMaaPMRSubscriber, DMaaPMRPublisher publisher, Creator creater)
+ throws InterruptedException {
+ LOGGER.info("fetch and publish from and to Dmaap started");
+
+ while (true) {
+ synchronized (this) {
+ for (String incomingJsonString : dMaaPMRSubscriber.fetchMessages().getFetchedMessages()) {
+ list.add(incomingJsonString);
+
+ }
+
+ if (list.size() == 0) {
+ Thread.sleep(2000);
+ }
+ LOGGER.debug("number of messages to be converted :{}", list.size());
+
+ if (list.size() != 0) {
+ String val = ((LinkedList<String>) list).removeFirst();
+ List<String> messages = new ArrayList<>();
+ String vesEvent = processReceivedJson(val);
+ if (!(vesEvent.isEmpty() || vesEvent.equals(null) || vesEvent.equals(""))) {
+ messages.add(vesEvent);
+ publisher.publish(messages);
+ LOGGER.info("Message successfully published to DMaaP Topic");
+ }
+
+ }
+
+ }
+ }
+ }
+
+ /**
+ * It finds mapping file for received json, transforms json to VES format
+ *
+ * @param incomingJsonString
+ * @return
+ */
+ private String processReceivedJson(String incomingJsonString) {
+ String outgoingJsonString = null;
+ if (!"".equals(incomingJsonString)) {
+
+ try {
+ /*
+ * For Future events String eventType =
+ * adapterService.identifyEventTypeFromIncomingJson(incomingJsonString);
+ *
+ * LOGGER.debug("Event identified as " + eventType);
+ */
+
+ outgoingJsonString = eventAdapter.transform(incomingJsonString, "snmp");
+
+ } catch (VesException exception) {
+ LOGGER.error("Received exception : " + exception.getMessage(), exception);
+ LOGGER.error("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
+ } catch (DMaapException e) {
+ LOGGER.error("Received exception : ", e.getMessage());
+ }
+ }
+ return outgoingJsonString;
+ }
- private MRBatchingPublisher publisher;
-
- private List<String> outgoingMessageQueue = new CopyOnWriteArrayList<>();
-
- /**
- * Adds message to outgoing queue that will be sent to DMaaP topic.
- *
- * @param message outbound message in VES format
- */
- public void addMessageInOutgoingQueue(String message) {
- if (null != message && !"".equals(message)) {
- outgoingMessageQueue.add(message);
- eLOGGER.debug("Added message to outgoing queue " + message);
- }
- }
-
-
-
- /**
- * reads the messages on DMaap MR Topic
- *
- * @return iterable of messages that will be received on DMaap MR Topic
- *
- * @throws DMaapException
- */
- public Iterable<String> consumeFromDMaap() throws DMaapException{
- if(null == consumer){
- try {
- consumer = MRClientFactory.createConsumer (dmaapMrUrlObject.getConsumerProperties());
- eLOGGER.debug("Created consumer");
- } catch (IOException exception) {
- throw new DMaapException("Problem creating consumer \nReason : " + exception.getMessage(), exception);
- }
- }
-
- try {
- eLOGGER.debug("Returning result fetched by consumer");
- return consumer.fetch();
- } catch (Exception exception) {
- throw new DMaapException("Problem while fetching messaged from consumer \nReason : " + exception.getMessage(), exception);
- }
-// return () -> Collections.emptyIterator();
-
- }
-
-
- /**
- * sends the messages to DMaap MR Topic
- *
- *
- * @throws DMaapException
- */
- public void publishToDMaap() throws DMaapException{
- if(null == publisher){
- try {
- publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties());
- eLOGGER.debug("Create a publisher now.");
- } catch (IOException exception) {
- throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception);
- }
- }
- for(String message : outgoingMessageQueue){
- try {
- publisher.send(message);
- eLOGGER.debug("Sending message to DMaaP :-> " + message );
- } catch (IOException exception) {
- throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception);
- }
- }
- List<message> stuck = null;
- try {
- stuck = publisher.close ( 20, TimeUnit.SECONDS );
- } catch (IOException | InterruptedException exception) {
- throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception);
- }
- if (null != stuck) {
- if (stuck.size() > 0) {
- eLOGGER.debug(stuck.size() + " messages unsent");
- } else {
- eLOGGER.debug("Clean exit; all messages sent.");
- }
- }
- else
- throw new DMaapException("Problem while closing publisher, no messages were returned. ");
-
- }
-
- /**
- * sends the messages to DMaap MR Topic
- *
- *
- * @throws DMaapException
- */
- public void publishToDMaap(String outgoingMessage) throws DMaapException{
- if(null == publisher){
- synchronized(publisher){
- if(null == publisher){
- try {
- publisher = MRClientFactory.createBatchingPublisher (dmaapMrUrlObject.getPublisherProperties());
- eLOGGER.debug("Publisher created now.");
- } catch (IOException exception) {
- throw new DMaapException("Problem creating publisher \nReason : " + exception.getMessage(), exception);
- }
- }
- }
- }
- try {
- publisher.send(outgoingMessage);
- eLOGGER.debug("Sent outgoing message " + outgoingMessage);
- } catch (IOException exception) {
- throw new DMaapException("Problem sending message to DMaaP topic \nReason : " + exception.getMessage(), exception);
- }
- List<message> stuck = null;
- try {
- stuck = publisher.close ( 20, TimeUnit.SECONDS );
- } catch (IOException | InterruptedException exception) {
- throw new DMaapException("Problem while closing publisher \nReason : " + exception.getMessage(), exception);
- }
- if (null != stuck) {
- if (stuck.size() > 0) {
- eLOGGER.debug(stuck.size() + " messages unsent");
- } else {
- eLOGGER.debug("Clean exit; all messages sent.");
- }
- } else{
- throw new DMaapException("Problem while closing publisher, no messages were returned. ");
- }
-
- }
-
-
- /**
- * for local testing only
- * @return
- * @throws DMaapException
- */
- /*public String consume() throws DMaapException {
-
- URL url;
- StringBuffer incomingJson = null;
- incomingJson = new StringBuffer();
- try {
- url = new URL(dmaapMrUrlObject.getUrl());
-
- //open the connection to the above URL.
- URLConnection urlcon = url.openConnection();
-
- Map<String, List<String>> header = urlcon.getHeaderFields();
-
- //print all the fields along with their value.
- for (Map.Entry<String, List<String>> mp : header.entrySet()) {
- eLOGGER.debug(mp.getKey() + " : ");
- eLOGGER.debug(mp.getValue().toString());
- }
- eLOGGER.debug("Complete source code of the URL is-");
- eLOGGER.debug("---------------------------------");
-
- //get the inputstream of the open connection.
- BufferedReader br = new BufferedReader(new InputStreamReader(urlcon.getInputStream()));
- String tempString;
- //print the source code line by line.
- while ((tempString = br.readLine()) != null) {
- eLOGGER.debug(tempString);
- incomingJson.append(tempString);
- }
-
- } catch (MalformedURLException exception) {
- throw new DMaapException("Problem consuming from url \nReason : " + exception.getMessage(), exception);
- } catch (IOException exception) {
- throw new DMaapException("Problem consuming \nReason : " + exception.getMessage(), exception);
- }
- return incomingJson.toString();
- }*/
-
-
-
-
-
-
-
}