aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java280
-rw-r--r--dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java234
-rw-r--r--dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java211
-rw-r--r--dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncOdlConnection.java225
4 files changed, 475 insertions, 475 deletions
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java
index ec34fe40..de76e454 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java
@@ -26,140 +26,156 @@ import java.io.FileInputStream;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DmaapListener {
-
- private static final String DMAAP_LISTENER_PROPERTIES = "dmaap-listener.properties";
- private static final String SDNC_CONFIG_DIR = "SDNC_CONFIG_DIR";
- private static final Logger LOG = LoggerFactory
- .getLogger(DmaapListener.class);
-
- public static void main(String[] args) {
-
- Properties properties = new Properties();
-
-
- String propFileName = DMAAP_LISTENER_PROPERTIES;
-
- if (args.length > 0) {
- propFileName = args[0];
- }
-
- String propPath = null;
- String propDir = System.getenv(SDNC_CONFIG_DIR);
-
- List<SdncDmaapConsumer> consumers = new LinkedList();
-
- if (propDir == null) {
-
- propDir = "/opt/sdnc/data/properties";
- }
-
- if (!propFileName.startsWith("/")) {
- propPath = propDir + "/" + propFileName;
- }
-
- File propFile = new File(propPath);
-
- if (!propFile.canRead()) {
- LOG.error("Cannot read properties file "+propPath);
- System.exit(1);
- }
-
- try {
- properties.load(new FileInputStream(propFile));
- } catch (Exception e) {
- LOG.error("Caught exception loading properties from "+propPath, e);
- System.exit(1);
- }
-
- String subscriptionStr = properties.getProperty("subscriptions");
-
- boolean threadsRunning = false;
-
- LOG.debug("Dmaap subscriptions : "+subscriptionStr);
-
- if (subscriptionStr != null) {
- String[] subscriptions = subscriptionStr.split(";");
-
- for (int i = 0; i < subscriptions.length; i++) {
- String[] subscription = subscriptions[i].split(":");
- String consumerClassName = subscription[0];
- String propertyPath = subscription[1];
-
- LOG.debug("Handling subscription [" + consumerClassName + "," + propertyPath + "]");
-
- if (propertyPath == null) {
- LOG.error("Invalid subscription (" + subscriptions[i] + ") property file missing");
- continue;
- }
-
- if (!propertyPath.startsWith("/")) {
- propertyPath = propDir + "/" + propertyPath;
- }
-
- Class<?> consumerClass = null;
-
- try {
- consumerClass = Class.forName(consumerClassName);
- } catch (Exception e) {
- LOG.error("Could not find DMaap consumer class {}", consumerClassName, e);
- }
-
- if (consumerClass != null) {
-
- SdncDmaapConsumer consumer = null;
-
- try {
- consumer = (SdncDmaapConsumer) consumerClass.newInstance();
- } catch (Exception e) {
- LOG.error("Could not create consumer from class " + consumerClassName, e);
- }
-
- if (consumer != null) {
- LOG.debug("Initializing consumer " + consumerClassName + "(" + propertyPath + ")");
- consumer.init(properties, propertyPath);
-
- if (consumer.isReady()) {
- Thread consumerThread = new Thread(consumer);
- consumerThread.start();
- consumers.add(consumer);
- threadsRunning = true;
- LOG.info("Started consumer thread (" + consumerClassName + " : " + propertyPath + ")");
- } else {
- LOG.debug("Consumer " + consumerClassName + " is not ready");
- }
- }
-
- }
-
- }
- }
-
- while (threadsRunning) {
-
- threadsRunning = false;
- for (SdncDmaapConsumer consumer : consumers) {
- if (consumer.isRunning()) {
- threadsRunning = true;
- }
- }
-
- if (!threadsRunning) {
- break;
- }
-
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
-
- }
- }
-
- LOG.info("No listener threads running - exitting");
-
- }
+
+ private static final String DMAAP_LISTENER_PROPERTIES = "dmaap-listener.properties";
+ private static final String DMAAP_LISTENER_PROPERTIES_DIR = "/opt/sdnc/data/properties";
+ private static final String SDNC_CONFIG_DIR = "SDNC_CONFIG_DIR";
+ private static final Logger LOG = LoggerFactory.getLogger(DmaapListener.class);
+
+ public static void main(String[] args) {
+
+ Properties properties = new Properties();
+ String propFileName = DMAAP_LISTENER_PROPERTIES;
+ String propPath = null;
+ String propDir = System.getenv(SDNC_CONFIG_DIR);
+ List<SdncDmaapConsumer> consumers = new LinkedList<>();
+
+ if (args.length > 0) {
+ propFileName = args[0];
+ }
+
+ if (propDir == null) {
+ propDir = DMAAP_LISTENER_PROPERTIES_DIR;
+ }
+
+ if (!propFileName.startsWith("/")) {
+ propPath = propDir + "/" + propFileName;
+ }
+
+ if (propPath != null) {
+ properties = loadProperties(propPath, properties);
+
+ String subscriptionStr = properties.getProperty("subscriptions");
+
+ boolean threadsRunning = false;
+
+ LOG.debug("Dmaap subscriptions : " + subscriptionStr);
+
+ if (subscriptionStr != null) {
+ threadsRunning = handleSubscriptions(subscriptionStr, propDir, properties, consumers);
+ }
+
+ while (threadsRunning) {
+ threadsRunning = updateThreadState(consumers);
+ if (!threadsRunning) {
+ break;
+ }
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ LOG.error(e.getLocalizedMessage(), e);
+ }
+ }
+
+ LOG.info("No listener threads running - exiting");
+ }
+ }
+
+ private static boolean updateThreadState(List<SdncDmaapConsumer> consumers) {
+ boolean threadsRunning = false;
+ for (SdncDmaapConsumer consumer : consumers) {
+ if (consumer.isRunning()) {
+ threadsRunning = true;
+ }
+ }
+ return threadsRunning;
+ }
+
+ private static Properties loadProperties(String propPath, Properties properties) {
+ File propFile = new File(propPath);
+
+ if (!propFile.canRead()) {
+ LOG.error("Cannot read properties file " + propPath);
+ System.exit(1);
+ }
+
+ try (FileInputStream in = new FileInputStream(propFile)) {
+ properties.load(in);
+ } catch (Exception e) {
+ LOG.error("Caught exception loading properties from " + propPath, e);
+ System.exit(1);
+ }
+ return properties;
+ }
+
+ private static boolean handleSubscriptions(String subscriptionStr, String propDir, Properties properties,
+ List<SdncDmaapConsumer> consumers) {
+ String[] subscriptions = subscriptionStr.split(";");
+
+ for (String subscription1 : subscriptions) {
+ String[] subscription = subscription1.split(":");
+ String consumerClassName = subscription[0];
+ String propertyPath = subscription[1];
+
+ LOG.debug(String.format("Handling subscription [%s,%s]", consumerClassName, propertyPath));
+
+ if (propertyPath == null) {
+ LOG.error(String.format("Invalid subscription (%s) property file missing", subscription1));
+ continue;
+ }
+
+ if (!propertyPath.startsWith("/")) {
+ propertyPath = propDir + "/" + propertyPath;
+ }
+
+ Class<?> consumerClass = null;
+
+ try {
+ consumerClass = Class.forName(consumerClassName);
+ } catch (Exception e) {
+ LOG.error("Could not find DMaap consumer class {}", consumerClassName, e);
+ }
+
+ if (consumerClass != null) {
+ return handleConsumerClass(consumerClass, consumerClassName, propertyPath,
+ properties, consumers);
+ }
+ }
+ return false;
+ }
+
+ private static boolean handleConsumerClass(Class<?> consumerClass, String consumerClassName, String propertyPath,
+ Properties properties, List<SdncDmaapConsumer> consumers) {
+
+ SdncDmaapConsumer consumer = null;
+
+ try {
+ consumer = (SdncDmaapConsumer) consumerClass.newInstance();
+ } catch (Exception e) {
+ LOG.error("Could not create consumer from class " + consumerClassName, e);
+ }
+
+ if (consumer != null) {
+ LOG.debug(String.format("Initializing consumer %s(%s)", consumerClassName, propertyPath));
+ consumer.init(properties, propertyPath);
+
+ if (consumer.isReady()) {
+ Thread consumerThread = new Thread(consumer);
+ consumerThread.start();
+ consumers.add(consumer);
+
+ LOG.info(String.format("Started consumer thread (%s : %s)", consumerClassName,
+ propertyPath));
+ return true;
+ } else {
+ LOG.debug(String.format("Consumer %s is not ready", consumerClassName));
+ }
+ }
+ return false;
+ }
}
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
index c1320d6b..a0c55530 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
@@ -21,126 +21,132 @@
package org.onap.ccsdk.sli.northbound.dmaapclient;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.response.MRConsumerResponse;
import java.io.File;
import java.io.FileInputStream;
import java.util.Properties;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.response.MRConsumerResponse;
-
public abstract class SdncDmaapConsumer implements Runnable {
- private static final Logger LOG = LoggerFactory
- .getLogger(SdncDmaapConsumer.class);
-
- private String propertiesPath = "";
- private Properties properties = null;
- MRConsumer consumer = null;
- MRConsumerResponse consumerResponse = null;
- boolean running = false;
- boolean ready = false;
- int fetchPause = 5000; // Default pause between fetchs - 5 seconds
-
- public boolean isReady() {
- return ready;
- }
-
- int timeout = 15000; // Default timeout - 15 seconds
-
- public boolean isRunning() {
- return running;
- }
-
- public SdncDmaapConsumer() {
-
- }
-
- public SdncDmaapConsumer(Properties properties, String propertiesPath) {
- init(properties, propertiesPath);
- }
-
- public String getProperty(String name) {
- return(properties.getProperty(name, ""));
- }
-
- public void init(Properties properties, String propertiesPath) {
-
- this.propertiesPath = propertiesPath;
-
- try {
-
- this.properties = (Properties) properties.clone();
-
- this.properties.load(new FileInputStream(new File(propertiesPath)));
-
- String timeoutStr = properties.getProperty("timeout");
-
- if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
- try {
- timeout = Integer.parseInt(timeoutStr);
- } catch (NumberFormatException e) {
- LOG.error("Non-numeric value specified for timeout ("+timeoutStr+")");
- }
- }
-
- String fetchPauseStr = properties.getProperty("fetchPause");
- if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
- try {
- fetchPause = Integer.parseInt(fetchPauseStr);
- } catch (NumberFormatException e) {
- LOG.error("Non-numeric valud specified for fetchPause ("+fetchPauseStr+")");
- }
- }
-
- this.consumer = MRClientFactory.createConsumer(propertiesPath);
- ready = true;
- } catch (Exception e) {
- LOG.error("Error initializing DMaaP consumer from file "+propertiesPath, e);
- }
- }
-
-
- @Override
- public void run() {
- if (ready) {
-
- running = true;
-
- while (running) {
-
- try {
- boolean noData = true;
- consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
- for (String msg : consumerResponse.getActualMessages()) {
- noData = false;
- LOG.info("Received message from DMaaP:\n"+msg);
- processMsg(msg);
- }
-
- if (noData) {
- if (fetchPause > 0) {
-
- LOG.info("No data received from fetch. Pausing "+fetchPause+" ms before retry");
- Thread.sleep(fetchPause);
- } else {
-
- LOG.info("No data received from fetch. No fetch pause specified - retrying immediately");
- }
- }
- } catch (Exception e) {
- LOG.error("Caught exception reading from DMaaP", e);
- running = false;
- }
-
-
- }
- }
-
- }
-
- abstract public void processMsg(String msg) throws InvalidMessageException;
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SdncDmaapConsumer.class);
+
+ private String propertiesPath = "";
+ private Properties properties = null;
+ private MRConsumer consumer = null;
+ private MRConsumerResponse consumerResponse = null;
+ private boolean running = false;
+ private boolean ready = false;
+ private int fetchPause = 5000; // Default pause between fetch - 5 seconds
+ private int timeout = 15000; // Default timeout - 15 seconds
+
+ public boolean isReady() {
+ return ready;
+ }
+
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public SdncDmaapConsumer() {
+
+ }
+
+ public SdncDmaapConsumer(Properties properties, String propertiesPath) {
+ init(properties, propertiesPath);
+ }
+
+ public String getProperty(String name) {
+ return (properties.getProperty(name, ""));
+ }
+
+ public void init(Properties properties, String propertiesPath) {
+
+ this.propertiesPath = propertiesPath;
+
+ try (FileInputStream in = new FileInputStream(new File(propertiesPath))) {
+
+ this.properties = (Properties) properties.clone();
+ this.properties.load(in);
+
+ String timeoutStr = properties.getProperty("timeout");
+
+ if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
+ timeout = parseTimeOutValue(timeoutStr);
+ }
+
+ String fetchPauseStr = properties.getProperty("fetchPause");
+ if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
+ fetchPause = parseFetchPause(fetchPauseStr);
+ }
+
+ this.consumer = MRClientFactory.createConsumer(propertiesPath);
+ ready = true;
+ } catch (Exception e) {
+ LOG.error("Error initializing DMaaP consumer from file " + propertiesPath, e);
+ }
+ }
+
+ private int parseTimeOutValue(String timeoutStr) {
+ try {
+ return Integer.parseInt(timeoutStr);
+ } catch (NumberFormatException e) {
+ LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
+ }
+ return timeout;
+ }
+
+ private int parseFetchPause(String fetchPauseStr) {
+ try {
+ return Integer.parseInt(fetchPauseStr);
+ } catch (NumberFormatException e) {
+ LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
+ }
+ return fetchPause;
+ }
+
+
+ @Override
+ public void run() {
+ if (ready) {
+
+ running = true;
+
+ while (running) {
+
+ try {
+ boolean noData = true;
+ consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
+ for (String msg : consumerResponse.getActualMessages()) {
+ noData = false;
+ LOG.info("Received message from DMaaP:\n" + msg);
+ processMsg(msg);
+ }
+
+ if (noData) {
+ pauseThread();
+ }
+ } catch (Exception e) {
+ LOG.error("Caught exception reading from DMaaP", e);
+ running = false;
+ }
+ }
+ }
+ }
+
+ private void pauseThread() throws InterruptedException {
+ if (fetchPause > 0) {
+ LOG.info(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause));
+ Thread.sleep(fetchPause);
+ } else {
+ LOG.info("No data received from fetch. No fetch pause specified - retrying immediately");
+ }
+ }
+
+ abstract public void processMsg(String msg) throws InvalidMessageException;
}
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java
index 60a508e5..1d499a1e 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java
@@ -21,163 +21,148 @@
package org.onap.ccsdk.sli.northbound.dmaapclient;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
public class SdncFlatJsonDmaapConsumer extends SdncDmaapConsumer {
- private static final Logger LOG = LoggerFactory
- .getLogger(SdncFlatJsonDmaapConsumer.class);
-
- private static final String DMAAPLISTENERROOT = "DMAAPLISTENERROOT";
- private static final String SDNC_ENDPOINT = "SDNC.endpoint";
-
-
-
- @Override
- public void processMsg(String msg) throws InvalidMessageException {
-
- processMsg(msg, null);
- }
-
- public void processMsg(String msg, String mapDirName) throws InvalidMessageException {
-
- if (msg == null) {
- throw new InvalidMessageException("Null message");
- }
-
- ObjectMapper oMapper = new ObjectMapper();
- JsonNode instarRootNode ;
- ObjectNode sdncRootNode;
-
- String instarMsgName = null;
-
- try {
- instarRootNode = oMapper.readTree(msg);
- } catch (Exception e) {
- throw new InvalidMessageException("Cannot parse json object", e);
- }
+ private static final Logger LOG = LoggerFactory.getLogger(SdncFlatJsonDmaapConsumer.class);
- Iterator<Map.Entry<String, JsonNode>> instarFields = instarRootNode.fields();
+ private static final String DMAAPLISTENERROOT = "DMAAPLISTENERROOT";
+ private static final String SDNC_ENDPOINT = "SDNC.endpoint";
- while (instarFields.hasNext()) {
- Map.Entry<String, JsonNode> entry = instarFields.next();
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException {
- instarMsgName = entry.getKey();
- instarRootNode = entry.getValue();
- break;
- }
+ processMsg(msg, null);
+ }
- Map<String,String> fieldMap = loadMap(instarMsgName, mapDirName);
+ public void processMsg(String msg, String mapDirName) throws InvalidMessageException {
- if (fieldMap == null) {
- throw new InvalidMessageException("Unable to process message - cannot load field mappings");
- }
+ if (msg == null) {
+ throw new InvalidMessageException("Null message");
+ }
- if (!fieldMap.containsKey(SDNC_ENDPOINT)) {
- throw new InvalidMessageException("No SDNC endpoint known for message "+instarMsgName);
- }
+ ObjectMapper oMapper = new ObjectMapper();
+ JsonNode instarRootNode;
+ ObjectNode sdncRootNode;
- String sdncEndpoint = fieldMap.get(SDNC_ENDPOINT);
+ String instarMsgName = null;
- sdncRootNode = oMapper.createObjectNode();
- ObjectNode inputNode = oMapper.createObjectNode();
+ try {
+ instarRootNode = oMapper.readTree(msg);
+ } catch (Exception e) {
+ throw new InvalidMessageException("Cannot parse json object", e);
+ }
+ Iterator<Map.Entry<String, JsonNode>> instarFields = instarRootNode.fields();
- for (Map.Entry<String, String> entry: fieldMap.entrySet()) {
+ while (instarFields.hasNext()) {
+ Map.Entry<String, JsonNode> entry = instarFields.next();
- if (!SDNC_ENDPOINT.equals(entry.getKey())) {
- JsonNode curNode = instarRootNode.get(entry.getKey());
- if (curNode != null) {
- String fromValue = curNode.textValue();
+ instarMsgName = entry.getKey();
+ instarRootNode = entry.getValue();
+ break;
+ }
- inputNode.put(entry.getValue(), fromValue);
- }
- }
- }
- sdncRootNode.put("input", inputNode);
+ Map<String, String> fieldMap = loadMap(instarMsgName, mapDirName);
- try {
- String rpcMsgbody = oMapper.writeValueAsString(sdncRootNode);
- String odlUrlBase = getProperty("sdnc.odl.url-base");
- String odlUser = getProperty("sdnc.odl.user");
- String odlPassword = getProperty("sdnc.odl.password");
+ if (fieldMap == null) {
+ throw new InvalidMessageException("Unable to process message - cannot load field mappings");
+ }
- if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
- SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + sdncEndpoint, odlUser, odlPassword);
+ if (!fieldMap.containsKey(SDNC_ENDPOINT)) {
+ throw new InvalidMessageException("No SDNC endpoint known for message " + instarMsgName);
+ }
- conn.send("POST", "application/json", rpcMsgbody);
- } else {
- LOG.info("POST message body would be:\n"+rpcMsgbody);
- }
- } catch (Exception e) {
- LOG.error("Unable to process message", e);
- }
+ String sdncEndpoint = fieldMap.get(SDNC_ENDPOINT);
- }
+ sdncRootNode = oMapper.createObjectNode();
+ ObjectNode inputNode = oMapper.createObjectNode();
- private Map<String,String> loadMap(String msgType, String mapDirName) {
- Map<String, String> results = new HashMap<>();
+ for (Map.Entry<String, String> entry : fieldMap.entrySet()) {
+ if (!SDNC_ENDPOINT.equals(entry.getKey())) {
+ JsonNode curNode = instarRootNode.get(entry.getKey());
+ if (curNode != null) {
+ String fromValue = curNode.textValue();
- if (mapDirName == null) {
- String rootdir = System.getenv(DMAAPLISTENERROOT);
+ inputNode.put(entry.getValue(), fromValue);
+ }
+ }
+ }
+ sdncRootNode.put("input", inputNode);
- if ((rootdir == null) || (rootdir.length() == 0)) {
- rootdir = "/opt/app/dmaap-listener";
- }
+ try {
+ String rpcMsgbody = oMapper.writeValueAsString(sdncRootNode);
+ String odlUrlBase = getProperty("sdnc.odl.url-base");
+ String odlUser = getProperty("sdnc.odl.user");
+ String odlPassword = getProperty("sdnc.odl.password");
- mapDirName = rootdir + "/lib";
+ if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
+ SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + sdncEndpoint, odlUser, odlPassword);
- }
+ conn.send("POST", "application/json", rpcMsgbody);
+ } else {
+ LOG.info("POST message body would be:\n" + rpcMsgbody);
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to process message", e);
+ }
+ }
- String mapFilename = mapDirName + "/" + msgType + ".map";
+ private Map<String, String> loadMap(String msgType, String mapDirName) {
+ Map<String, String> results = new HashMap<>();
- File mapFile = new File(mapFilename);
+ if (mapDirName == null) {
+ String rootdir = System.getenv(DMAAPLISTENERROOT);
- if (!mapFile.canRead()) {
- LOG.error("Cannot read map file ("+mapFilename+")");
- return(null);
- }
+ if ((rootdir == null) || (rootdir.length() == 0)) {
+ rootdir = "/opt/app/dmaap-listener";
+ }
- try (BufferedReader mapReader = new BufferedReader(new FileReader(mapFile))) {
+ mapDirName = rootdir + "/lib";
+ }
- String curLine;
+ String mapFilename = mapDirName + "/" + msgType + ".map";
- while ((curLine = mapReader.readLine()) != null) {
- curLine = curLine.trim();
+ File mapFile = new File(mapFilename);
- if ((curLine.length() > 0) && (!curLine.startsWith("#"))) {
+ if (!mapFile.canRead()) {
+ LOG.error(String.format("Cannot read map file (%s)", mapFilename));
+ return (null);
+ }
- if (curLine.contains("=>")) {
- String[] entry = curLine.split("=>");
- if (entry.length == 2) {
- results.put(entry[0].trim(), entry[1].trim());
- }
- }
- }
- }
- mapReader.close();
- } catch (Exception e) {
- LOG.error("Caught exception reading map "+mapFilename, e);
- return(null);
- }
+ try (BufferedReader mapReader = new BufferedReader(new FileReader(mapFile))) {
- return(results);
- }
+ String curLine;
+ while ((curLine = mapReader.readLine()) != null) {
+ curLine = curLine.trim();
+ if ((curLine.length() > 0) && (!curLine.startsWith("#")) && curLine.contains("=>")) {
+ String[] entry = curLine.split("=>");
+ if (entry.length == 2) {
+ results.put(entry[0].trim(), entry[1].trim());
+ }
+ }
+ }
+ mapReader.close();
+ } catch (Exception e) {
+ LOG.error("Caught exception reading map " + mapFilename, e);
+ return (null);
+ }
+ return (results);
+ }
}
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncOdlConnection.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncOdlConnection.java
index 6aaf41cd..f88f7bf8 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncOdlConnection.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncOdlConnection.java
@@ -29,130 +29,123 @@ import java.net.Authenticator;
import java.net.HttpURLConnection;
import java.net.PasswordAuthentication;
import java.net.URL;
-
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSession;
-
import org.apache.commons.codec.binary.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SdncOdlConnection {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(SdncOdlConnection.class);
-
- private HttpURLConnection httpConn = null;
-
- private String url = null;
- private String user = null;
- private String password = null;
-
- private class SdncAuthenticator extends Authenticator {
-
- private String user;
- private String passwd;
-
- SdncAuthenticator(String user, String passwd) {
- this.user = user;
- this.passwd = passwd;
- }
- @Override
- protected PasswordAuthentication getPasswordAuthentication() {
- return new PasswordAuthentication(user, passwd.toCharArray());
- }
-
- }
-
- private SdncOdlConnection() {
-
- }
-
- private SdncOdlConnection(String url, String user, String password) {
- this.url = url;
- this.user = user;
- this.password = password;
-
- try {
- URL sdncUrl = new URL(url);
- Authenticator.setDefault(new SdncAuthenticator(user, password));
-
- this.httpConn = (HttpURLConnection) sdncUrl.openConnection();
- } catch (Exception e) {
- LOG.error("Unable to create http connection", e);
- }
- }
-
- public static SdncOdlConnection newInstance(String url, String user, String password) throws IOException
- {
- return new SdncOdlConnection(url, user, password);
- }
-
-
-
- public String send(String method, String contentType, String msg) throws IOException {
-
- LOG.info("Sending REST " + method + " to " + url);
- LOG.info("Message body:\n" + msg);
- String authStr = user + ":" + password;
- String encodedAuthStr = new String(Base64.encodeBase64(authStr.getBytes()));
-
- httpConn.addRequestProperty("Authentication", "Basic " + encodedAuthStr);
-
- httpConn.setRequestMethod(method);
- httpConn.setRequestProperty("Content-Type", contentType);
- httpConn.setRequestProperty("Accept", contentType);
-
- httpConn.setDoInput(true);
- httpConn.setDoOutput(true);
- httpConn.setUseCaches(false);
-
- if (httpConn instanceof HttpsURLConnection) {
- HostnameVerifier hostnameVerifier = new HostnameVerifier() {
- @Override
- public boolean verify(String hostname, SSLSession session) {
- return true;
- }
- };
- ((HttpsURLConnection) httpConn).setHostnameVerifier(hostnameVerifier);
- }
-
- // Write message
- httpConn.setRequestProperty("Content-Length", Integer.toString(msg.length()));
- DataOutputStream outStr = new DataOutputStream(httpConn.getOutputStream());
- outStr.write(msg.getBytes());
- outStr.close();
-
- // Read response
- BufferedReader respRdr;
-
- LOG.info("Response: " + httpConn.getResponseCode() + " " + httpConn.getResponseMessage());
-
- if (httpConn.getResponseCode() < 300) {
-
- respRdr = new BufferedReader(new InputStreamReader(httpConn.getInputStream()));
- } else {
- respRdr = new BufferedReader(new InputStreamReader(httpConn.getErrorStream()));
- }
-
- StringBuffer respBuff = new StringBuffer();
-
- String respLn;
-
- while ((respLn = respRdr.readLine()) != null) {
- respBuff.append(respLn + "\n");
- }
- respRdr.close();
-
- String respString = respBuff.toString();
-
- LOG.info("Response body :\n" + respString);
-
- return respString;
-
- }
-
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SdncOdlConnection.class);
+
+ private HttpURLConnection httpConn = null;
+
+ private String url = null;
+ private String user = null;
+ private String password = null;
+
+ private class SdncAuthenticator extends Authenticator {
+
+ private String user;
+ private String passwd;
+
+ SdncAuthenticator(String user, String passwd) {
+ this.user = user;
+ this.passwd = passwd;
+ }
+
+ @Override
+ protected PasswordAuthentication getPasswordAuthentication() {
+ return new PasswordAuthentication(user, passwd.toCharArray());
+ }
+ }
+
+ private SdncOdlConnection() {
+
+ }
+
+ private SdncOdlConnection(String url, String user, String password) {
+ this.url = url;
+ this.user = user;
+ this.password = password;
+
+ try {
+ URL sdncUrl = new URL(url);
+ Authenticator.setDefault(new SdncAuthenticator(user, password));
+
+ this.httpConn = (HttpURLConnection) sdncUrl.openConnection();
+ } catch (Exception e) {
+ LOG.error("Unable to create http connection", e);
+ }
+ }
+
+ public static SdncOdlConnection newInstance(String url, String user, String password) throws IOException {
+ return new SdncOdlConnection(url, user, password);
+ }
+
+
+ public String send(String method, String contentType, String msg) throws IOException {
+
+ LOG.info(String.format("Sending REST %s to %s", method, url));
+ LOG.info(String.format("Message body:%n%s", msg));
+ String authStr = user + ":" + password;
+ String encodedAuthStr = new String(Base64.encodeBase64(authStr.getBytes()));
+
+ httpConn.addRequestProperty("Authentication", "Basic " + encodedAuthStr);
+
+ httpConn.setRequestMethod(method);
+ httpConn.setRequestProperty("Content-Type", contentType);
+ httpConn.setRequestProperty("Accept", contentType);
+
+ httpConn.setDoInput(true);
+ httpConn.setDoOutput(true);
+ httpConn.setUseCaches(false);
+
+ if (httpConn instanceof HttpsURLConnection) {
+ HostnameVerifier hostnameVerifier = new HostnameVerifier() {
+ @Override
+ public boolean verify(String hostname, SSLSession session) {
+ return true;
+ }
+ };
+ ((HttpsURLConnection) httpConn).setHostnameVerifier(hostnameVerifier);
+ }
+
+ // Write message
+ httpConn.setRequestProperty("Content-Length", Integer.toString(msg.length()));
+ DataOutputStream outStr = new DataOutputStream(httpConn.getOutputStream());
+ outStr.write(msg.getBytes());
+ outStr.close();
+
+ // Read response
+ BufferedReader respRdr;
+
+ LOG.info("Response: " + httpConn.getResponseCode() + " " + httpConn.getResponseMessage());
+
+ if (httpConn.getResponseCode() < 300) {
+
+ respRdr = new BufferedReader(new InputStreamReader(httpConn.getInputStream()));
+ } else {
+ respRdr = new BufferedReader(new InputStreamReader(httpConn.getErrorStream()));
+ }
+
+ StringBuilder respBuff = new StringBuilder();
+
+ String respLn;
+
+ while ((respLn = respRdr.readLine()) != null) {
+ respBuff.append(respLn).append("\n");
+ }
+ respRdr.close();
+
+ String respString = respBuff.toString();
+
+ LOG.info(String.format("Response body :%n%s", respString));
+
+ return respString;
+ }
}