summaryrefslogtreecommitdiffstats
path: root/northbound/dmaap-listener/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'northbound/dmaap-listener/src/main')
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/A1AdapterPolicyDmaapConsumer.java108
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/CMNotifyDmaapConsumer.java108
-rwxr-xr-xnorthbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java185
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java37
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/InvalidMessageException.java37
-rwxr-xr-xnorthbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java219
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java222
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/OofPciPocDmaapConsumers.java564
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java280
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java130
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java34
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java160
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java170
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java93
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncOdlConnection.java151
-rw-r--r--northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncRANSliceDmaapConsumer.java92
-rw-r--r--northbound/dmaap-listener/src/main/resources/anr-changes-from-policy-to-sdnr.map5
-rw-r--r--northbound/dmaap-listener/src/main/resources/anr-pci-changes-from-policy-to-sdnr.vt8
-rw-r--r--northbound/dmaap-listener/src/main/resources/edgeRouterStatusChange.map44
-rw-r--r--northbound/dmaap-listener/src/main/resources/esr-thirdparty-sdnc.map5
-rw-r--r--northbound/dmaap-listener/src/main/resources/generic-vnf.map5
-rw-r--r--northbound/dmaap-listener/src/main/resources/log4j2.xml26
-rw-r--r--northbound/dmaap-listener/src/main/resources/pci-changes-from-policy-to-sdnr.map5
-rw-r--r--northbound/dmaap-listener/src/main/resources/preferredRoute.txt1
-rw-r--r--northbound/dmaap-listener/src/main/resources/pserver.map5
-rw-r--r--northbound/dmaap-listener/src/main/resources/template-esr-thirdparty-sdnc.vt100
-rw-r--r--northbound/dmaap-listener/src/main/resources/template-generic-vnf.vt11
-rw-r--r--northbound/dmaap-listener/src/main/resources/template-pserver.vt11
-rw-r--r--northbound/dmaap-listener/src/main/scripts/start-dmaap-listener.sh69
-rw-r--r--northbound/dmaap-listener/src/main/scripts/stop-dmaap-listener.sh52
30 files changed, 2937 insertions, 0 deletions
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/A1AdapterPolicyDmaapConsumer.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/A1AdapterPolicyDmaapConsumer.java
new file mode 100644
index 000000000..dd59f5868
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/A1AdapterPolicyDmaapConsumer.java
@@ -0,0 +1,108 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * Modifications Copyright © 2019 IBM.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class A1AdapterPolicyDmaapConsumer extends SdncDmaapConsumerImpl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(A1AdapterPolicyDmaapConsumer.class);
+
+ private static final String BODY = "body";
+ private static final String RPC = "rpc-name";
+ private static final String INPUT = "input";
+ private static final String PAYLOAD = "Payload";
+
+
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException {
+
+ if (msg == null) {
+ throw new InvalidMessageException("Null A1-ADAPTER-DMAAP message");
+ }
+
+ ObjectMapper oMapper = new ObjectMapper();
+ JsonNode a1AdapterRootNode;
+ try {
+ a1AdapterRootNode = oMapper.readTree(msg);
+ } catch (Exception e) {
+ throw new InvalidMessageException("Cannot parse A1-ADAPTER-DMAAP json input", e);
+ }
+
+ JsonNode bodyNode = a1AdapterRootNode.get(BODY);
+ if(bodyNode == null) {
+ LOG.warn("Missing body in A1-ADAPTER-DMAAP message");
+ return;
+ }
+
+ JsonNode input = bodyNode.get(INPUT);
+ if(input == null) {
+ LOG.info("Missing input node.");
+ return;
+ }
+
+ JsonNode payloadNode = input.get(PAYLOAD);
+ if(payloadNode == null) {
+ LOG.info("Missing payload node.");
+ return;
+ }
+
+ String rpcMsgbody;
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ rpcMsgbody = mapper.writeValueAsString(payloadNode);
+
+ } catch (Exception e) {
+ LOG.error("Unable to parse payload in A1-ADAPTER-DMAAP message", e);
+ return;
+ }
+
+ JsonNode rpcNode = a1AdapterRootNode.get(RPC);
+ if(rpcNode == null) {
+ LOG.warn("Missing node in A1-ADAPTER-DMAAP message- " + RPC);
+ return;
+ }
+ String rpc = rpcNode.textValue();
+ String sdncEndpoint = "A1-ADAPTER-API:" + rpc;
+
+ try {
+ String odlUrlBase = getProperty("sdnc.odl.url-base");
+ String odlUser = getProperty("sdnc.odl.user");
+ String odlPassword = getProperty("sdnc.odl.password");
+ LOG.info("POST A1-ADAPTER-API Request " + rpcMsgbody);
+ if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
+ SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + "/" + sdncEndpoint, odlUser, odlPassword);
+
+ conn.send("POST", "application/json", rpcMsgbody);
+ } else {
+ LOG.warn("Unable to POST A1-ADAPTER-API message. SDNC URL not available. body:\n" + rpcMsgbody);
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to process message", e);
+ }
+ }
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/CMNotifyDmaapConsumer.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/CMNotifyDmaapConsumer.java
new file mode 100644
index 000000000..fa14fbb24
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/CMNotifyDmaapConsumer.java
@@ -0,0 +1,108 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * Modifications Copyright © 2019 IBM.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.json.JSONObject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CMNotifyDmaapConsumer extends SdncDmaapConsumerImpl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CMNotifyDmaapConsumer.class);
+
+ private static final String BODY = "body";
+ private static final String RPC = "rpc-name";
+ private static final String INPUT = "input";
+ private static final String PAYLOAD = "Payload";
+
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException {
+
+ if (msg == null) {
+ throw new InvalidMessageException("Null CMNotify-DMAAP message");
+ }
+
+ ObjectMapper oMapper = new ObjectMapper();
+ JsonNode CMNotifyRootNode;
+ try {
+ CMNotifyRootNode = oMapper.readTree(msg);
+ } catch (Exception e) {
+ throw new InvalidMessageException("Cannot parse CMNotify-DMAAP json input", e);
+ }
+
+ JsonNode bodyNode = CMNotifyRootNode.get(BODY);
+ if(bodyNode == null) {
+ LOG.warn("Missing body in CMNotify-DMAAP message");
+ return;
+ }
+
+ JsonNode input = bodyNode.get(INPUT);
+ if(input == null) {
+ LOG.info("Missing input node.");
+ return;
+ }
+
+ JsonNode payloadNode = input.get(PAYLOAD);
+ if(payloadNode == null) {
+ LOG.info("Missing payload node.");
+ return;
+ }
+
+ String rpcMsgbody;
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ rpcMsgbody = "{\"input\":" + mapper.writeValueAsString(payloadNode) + "}";
+
+ } catch (Exception e) {
+ LOG.error("Unable to parse payload in CMNotify-DMAAP message", e);
+ return;
+ }
+
+ JsonNode rpcNode = CMNotifyRootNode.get(RPC);
+ if(rpcNode == null) {
+ LOG.warn("Missing node in CMNotify-DMAAP message- " + RPC);
+ return;
+ }
+ String rpc = rpcNode.textValue();
+ String sdncEndpoint = "CM-NOTIFY-API:" + rpc;
+
+ try {
+ String odlUrlBase = getProperty("sdnc.odl.url-base");
+ String odlUser = getProperty("sdnc.odl.user");
+ String odlPassword = getProperty("sdnc.odl.password");
+ LOG.info("POST CM-NOTIFY-API Request " + rpcMsgbody);
+ if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
+ SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + "/" + sdncEndpoint, odlUser, odlPassword);
+
+ conn.send("POST", "application/json", rpcMsgbody);
+ } else {
+ LOG.warn("Unable to POST CM-NOTIFY-API message. SDNC URL not available. body:\n" + rpcMsgbody);
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to process message", e);
+ }
+ }
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java
new file mode 100755
index 000000000..18c00d563
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DmaapListener.java
@@ -0,0 +1,185 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DmaapListener {
+
+ private static final String DMAAP_LISTENER_PROPERTIES = "dmaap-listener.properties";
+ private static final String DMAAP_LISTENER_PROPERTIES_DIR = "/opt/onap/ccsdk/data/properties";
+ private static final String SDNC_CONFIG_DIR = "SDNC_CONFIG_DIR";
+ private static final Logger LOG = LoggerFactory.getLogger(DmaapListener.class);
+
+ public static void main(String[] args) {
+
+ Properties properties = new Properties();
+ String propFileName = DMAAP_LISTENER_PROPERTIES;
+ String propPath = null;
+ String propDir = System.getProperty(SDNC_CONFIG_DIR);
+ if(propDir == null) {
+ propDir = System.getenv(SDNC_CONFIG_DIR);
+ LOG.debug(SDNC_CONFIG_DIR + " read from environment variable with value " + propDir);
+ }
+ List<SdncDmaapConsumer> consumers = new LinkedList<>();
+
+ if (args.length > 0) {
+ propFileName = args[0];
+ }
+
+ if (propDir == null) {
+ propDir = DMAAP_LISTENER_PROPERTIES_DIR;
+ }
+
+ if (!propFileName.startsWith("/")) {
+ propPath = propDir + "/" + propFileName;
+ }
+
+ if (propPath != null) {
+ properties = loadProperties(propPath, properties);
+
+ String subscriptionStr = properties.getProperty("subscriptions");
+
+ boolean threadsRunning = false;
+
+ LOG.debug("Dmaap subscriptions : " + subscriptionStr);
+
+ if (subscriptionStr != null) {
+ threadsRunning = handleSubscriptions(subscriptionStr, propDir, properties, consumers);
+ }
+
+ while (threadsRunning) {
+ threadsRunning = updateThreadState(consumers);
+ if (!threadsRunning) {
+ break;
+ }
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ LOG.error(e.getLocalizedMessage(), e);
+ }
+ }
+
+ LOG.info("No listener threads running - exiting");
+ }
+ }
+
+ private static boolean updateThreadState(List<SdncDmaapConsumer> consumers) {
+ boolean threadsRunning = false;
+ for (SdncDmaapConsumer consumer : consumers) {
+ if (consumer.isRunning()) {
+ threadsRunning = true;
+ }
+ }
+ return threadsRunning;
+ }
+
+ static Properties loadProperties(String propPath, Properties properties) {
+ File propFile = new File(propPath);
+
+ if (!propFile.canRead()) {
+ LOG.error("Cannot read properties file " + propPath);
+ System.exit(1);
+ }
+
+ try (FileInputStream in = new FileInputStream(propFile)) {
+ properties.load(in);
+ } catch (Exception e) {
+ LOG.error("Caught exception loading properties from " + propPath, e);
+ System.exit(1);
+ }
+ return properties;
+ }
+
+ static boolean handleSubscriptions(String subscriptionStr, String propDir, Properties properties,
+ List<SdncDmaapConsumer> consumers) {
+ String[] subscriptions = subscriptionStr.split(";");
+
+ for (String subscription1 : subscriptions) {
+ String[] subscription = subscription1.split(":");
+ String consumerClassName = subscription[0];
+ String propertyPath = subscription[1];
+
+ LOG.debug(String.format("Handling subscription [%s,%s]", consumerClassName, propertyPath));
+
+ if (propertyPath == null) {
+ LOG.error(String.format("Invalid subscription (%s) property file missing", subscription1));
+ continue;
+ }
+
+ if (!propertyPath.startsWith("/")) {
+ propertyPath = propDir + "/" + propertyPath;
+ }
+
+ Class<?> consumerClass = null;
+
+ try {
+ consumerClass = Class.forName(consumerClassName);
+ } catch (Exception e) {
+ LOG.error("Could not find DMaap consumer class {}", consumerClassName, e);
+ }
+
+ if (consumerClass != null) {
+ handleConsumerClass(consumerClass, consumerClassName, propertyPath,
+ properties, consumers);
+ }
+ }
+ return !consumers.isEmpty();
+ }
+
+ private static boolean handleConsumerClass(Class<?> consumerClass, String consumerClassName, String propertyPath,
+ Properties properties, List<SdncDmaapConsumer> consumers) {
+
+ SdncDmaapConsumer consumer = null;
+
+ try {
+ consumer = (SdncDmaapConsumer) consumerClass.newInstance();
+ } catch (Exception e) {
+ LOG.error("Could not create consumer from class " + consumerClassName, e);
+ }
+
+ if (consumer != null) {
+ LOG.debug(String.format("Initializing consumer %s(%s)", consumerClassName, propertyPath));
+ consumer.init(properties, propertyPath);
+
+ if (consumer.isReady()) {
+ Thread consumerThread = new Thread(consumer);
+ consumerThread.start();
+ consumers.add(consumer);
+
+ LOG.info(String.format("Started consumer thread (%s : %s)", consumerClassName,
+ propertyPath));
+ return true;
+ } else {
+ LOG.debug(String.format("Consumer %s is not ready", consumerClassName));
+ }
+ }
+ return false;
+ }
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java
new file mode 100644
index 000000000..57fcd8809
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DummyDmaapConsumer extends SdncDmaapConsumerImpl {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(DummyDmaapConsumer.class);
+
+ @Override
+ public void processMsg(String msg) {
+ LOG.info("Consumed message: \n"+msg);
+ }
+
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/InvalidMessageException.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/InvalidMessageException.java
new file mode 100644
index 000000000..cab8b901c
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/InvalidMessageException.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+public class InvalidMessageException extends Exception {
+
+ public InvalidMessageException() {
+ super();
+ }
+
+ public InvalidMessageException(String msg) {
+ super(msg);
+ }
+
+ public InvalidMessageException(String msg, Throwable t) {
+ super(msg, t);
+ }
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java
new file mode 100755
index 000000000..2a9e0b145
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java
@@ -0,0 +1,219 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.Invocation.Builder;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * jax-rs based client to build message router consumers
+ */
+public class MessageRouterHttpClient implements SdncDmaapConsumer {
+ private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClient.class);
+
+ protected Boolean isReady = false;
+ protected Boolean isRunning = false;
+ protected Client client;
+ protected URI uri;
+ protected Invocation getMessages;
+ protected Integer fetchPause;
+ protected Properties properties;
+ protected final String DEFAULT_CONNECT_TIMEOUT_SECONDS = "30";
+ protected final String DEFAULT_READ_TIMEOUT_MINUTES = "3";
+ protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";
+ protected final String DEFAULT_LIMIT = null;
+ protected final String DEFAULT_FETCH_PAUSE = "5000";
+
+ public MessageRouterHttpClient() {
+
+ }
+
+ @Override
+ public void run() {
+ if (isReady) {
+ isRunning = true;
+ while (isRunning) {
+ try {
+ Response response = getMessages.invoke();
+ Log.info("GET " + uri + " returned http status " + response.getStatus());
+ String entity = response.readEntity(String.class);
+ if (response.getStatus() < 300) {
+ if (entity.contains("{")) {
+ // Get rid of opening ["
+ entity = entity.substring(2);
+ // Get rid of closing "]
+ entity = entity.substring(0, entity.length() - 2);
+ // This replacement effectively un-escapes the JSON
+ for (String message : entity.split("\",\"")) {
+ try {
+ processMsg(message.replace("\\\"", "\""));
+ } catch (InvalidMessageException e) {
+ Log.error("Message could not be processed", e);
+ }
+ }
+ } else {
+ if (entity.length() < 1) {
+ Log.info("GET was successful, but the server returned an empty message body.");
+ } else {
+ Log.info(
+ "GET was successful, but entity is not valid JSON. Message body will be logged, but not processed");
+ Log.info(entity);
+ }
+ }
+ } else {
+ Log.info("GET failed, message body will be logged, but not processed.");
+ Log.info(entity);
+ }
+ } catch (Exception e) {
+ Log.error("GET " + uri + " failed.", e);
+ } finally {
+ Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + uri + " again.");
+ try {
+ Thread.sleep(fetchPause);
+ } catch (InterruptedException e) {
+ Log.error("Could not sleep thread", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void init(Properties baseProperties, String consumerPropertiesPath) {
+ try {
+ baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));
+ processProperties(baseProperties);
+ } catch (FileNotFoundException e) {
+ Log.error("FileNotFoundException while reading consumer properties", e);
+ } catch (IOException e) {
+ Log.error("IOException while reading consumer properties", e);
+ }
+ }
+
+ protected void processProperties(Properties properties) {
+ this.properties = properties;
+ String username = properties.getProperty("username");
+ String password = properties.getProperty("password");
+ String topic = properties.getProperty("topic");
+ String group = properties.getProperty("group");
+ String host = properties.getProperty("host");
+ String id = properties.getProperty("id");
+
+ String filter = properties.getProperty("filter");
+ if (filter != null) {
+ if (filter.length() > 0) {
+ try {
+ filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());
+ } catch (UnsupportedEncodingException e) {
+ Log.error("Filter could not be encoded, setting to null", e);
+ filter = null;
+ }
+ } else {
+ filter = null;
+ }
+ }
+
+ String limitString = properties.getProperty("limit", DEFAULT_LIMIT);
+ Integer limit = null;
+ if (limitString != null && limitString.length() > 0) {
+ limit = Integer.valueOf(limitString);
+ }
+
+ Integer timeoutQueryParamValue =
+ Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));
+ Integer connectTimeoutSeconds = Integer
+ .valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT_SECONDS));
+ Integer readTimeoutMinutes =
+ Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT_MINUTES));
+ this.client = getClient(connectTimeoutSeconds, readTimeoutMinutes);
+ this.uri = buildUri(topic, group, id, host, timeoutQueryParamValue, limit, filter);
+ Builder builder = client.target(uri).request("application/json");
+ if (username != null && password != null && username.length() > 0 && password.length() > 0) {
+ String authorizationString = buildAuthorizationString(username, password);
+ builder.header("Authorization", authorizationString);
+ }
+
+ this.getMessages = builder.buildGet();
+ this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause",DEFAULT_FETCH_PAUSE));
+ this.isReady = true;
+ }
+
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException {
+ System.out.println(msg);
+ }
+
+ @Override
+ public boolean isReady() {
+ return isReady;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ protected String buildAuthorizationString(String userName, String password) {
+ String basicAuthString = userName + ":" + password;
+ basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
+ return "Basic " + basicAuthString;
+ }
+
+ protected Client getClient(Integer connectTimeoutSeconds, Integer readTimeoutMinutes) {
+ ClientBuilder clientBuilder = ClientBuilder.newBuilder();
+ clientBuilder.connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS);
+ clientBuilder.readTimeout(readTimeoutMinutes, TimeUnit.MINUTES);
+ return clientBuilder.build();
+ }
+
+ protected URI buildUri(String topic, String consumerGroup, String consumerId, String host, Integer timeout,
+ Integer limit, String filter) {
+ UriBuilder builder = UriBuilder.fromPath("http://" + host + "/events/{topic}/{consumerGroup}/{consumderId}");
+ builder.queryParam("timeout", timeout);
+ if (limit != null) {
+ builder.queryParam("limit", limit);
+ }
+ if (filter != null) {
+ builder.queryParam("filter", filter);
+ }
+ return builder.build(topic, consumerGroup, consumerId);
+ }
+
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java
new file mode 100644
index 000000000..a6744045d
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java
@@ -0,0 +1,222 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * java.net based client to build message router consumers
+ */
+public class MessageRouterHttpClientJdk implements SdncDmaapConsumer {
+ private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClientJdk.class);
+
+ protected Boolean isReady = false;
+ protected Boolean isRunning = false;
+ protected URL url;
+ protected Integer fetchPause;
+ protected Properties properties;
+ protected final String DEFAULT_CONNECT_TIMEOUT = "30000";
+ protected final String DEFAULT_READ_TIMEOUT = "180000";
+ protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";
+ protected final String DEFAULT_LIMIT = null;
+ protected final String DEFAULT_FETCH_PAUSE = "5000";
+
+ private String authorizationString;
+ protected Integer connectTimeout;
+ protected Integer readTimeout;
+ protected String topic;
+
+ public MessageRouterHttpClientJdk() {}
+
+ @Override
+ public void run() {
+ if (isReady) {
+ isRunning = true;
+ while (isRunning) {
+ HttpURLConnection httpUrlConnection = null;
+ try {
+ httpUrlConnection = buildHttpURLConnection();
+ httpUrlConnection.connect();
+ int status = httpUrlConnection.getResponseCode();
+ Log.info("GET " + url + " returned http status " + status);
+ if (status < 300) {
+ BufferedReader br =
+ new BufferedReader(new InputStreamReader(httpUrlConnection.getInputStream()));
+ StringBuilder sb = new StringBuilder();
+ String line;
+ while ((line = br.readLine()) != null) {
+ sb.append(line + "\n");
+ }
+ br.close();
+ String responseBody = sb.toString();
+ if (responseBody.contains("{")) {
+ // Get rid of opening [" entity =
+ responseBody = responseBody.substring(2);
+ // Get rid of closing "]
+ responseBody = responseBody.substring(0, responseBody.length() - 2);
+ // Split the json array into individual elements to process
+ for (String message : responseBody.split("\",\"")) {
+ // unescape the json
+ message = message.replace("\\\"", "\"");
+ // Topic names cannot contain periods
+ processMsg(message);
+ }
+ } else {
+ Log.info("Entity doesn't appear to contain JSON elements, logging body");
+ Log.info(responseBody);
+ }
+ }
+ } catch (Exception e) {
+ Log.error("GET " + url + " failed.", e);
+ } finally {
+ if (httpUrlConnection != null) {
+ httpUrlConnection.disconnect();
+ }
+ Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + url + " again.");
+ try {
+ Thread.sleep(fetchPause);
+ } catch (InterruptedException e) {
+ Log.error("Could not sleep thread", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void init(Properties baseProperties, String consumerPropertiesPath) {
+ try {
+ baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));
+ processProperties(baseProperties);
+ } catch (FileNotFoundException e) {
+ Log.error("FileNotFoundException while reading consumer properties", e);
+ } catch (IOException e) {
+ Log.error("IOException while reading consumer properties", e);
+ }
+ }
+
+ protected void processProperties(Properties properties) throws MalformedURLException {
+ this.properties = properties;
+ String username = properties.getProperty("username");
+ String password = properties.getProperty("password");
+ topic = properties.getProperty("topic");
+ String group = properties.getProperty("group");
+ String host = properties.getProperty("host");
+ String id = properties.getProperty("id");
+
+ String filter = properties.getProperty("filter");
+ if (filter != null) {
+ if (filter.length() > 0) {
+ try {
+ filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());
+ } catch (UnsupportedEncodingException e) {
+ Log.error("Couldn't encode filter string", e);
+ }
+ } else {
+ filter = null;
+ }
+ }
+
+ String limitString = properties.getProperty("limit", DEFAULT_LIMIT);
+ Integer limit = null;
+ if (limitString != null && limitString.length() > 0) {
+ limit = Integer.valueOf(limitString);
+ }
+
+ Integer timeoutQueryParamValue =
+ Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));
+ connectTimeout = Integer.valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT));
+ readTimeout = Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT));
+ if (username != null && password != null && username.length() > 0 && password.length() > 0) {
+ authorizationString = buildAuthorizationString(username, password);
+ }
+ String urlString = buildlUrlString(topic, group, id, host, timeoutQueryParamValue, limit, filter);
+ this.url = new URL(urlString);
+ this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause", DEFAULT_FETCH_PAUSE));
+ this.isReady = true;
+ }
+
+ public void processMsg(String msg) {
+ Log.info(msg);
+ }
+
+ protected String buildAuthorizationString(String userName, String password) {
+ String basicAuthString = userName + ":" + password;
+ basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
+ return "Basic " + basicAuthString;
+ }
+
+ protected String buildlUrlString(String topic, String consumerGroup, String consumerId, String host,
+ Integer timeout, Integer limit, String filter) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("http://" + host + "/events/" + topic + "/" + consumerGroup + "/" + consumerId);
+ sb.append("?timeout=" + timeout);
+
+ if (limit != null) {
+ sb.append("&limit=" + limit);
+ }
+ if (filter != null) {
+ sb.append("&filter=" + filter);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public boolean isReady() {
+ return isReady;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ protected HttpURLConnection buildHttpURLConnection() throws IOException {
+ HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection();
+ if (authorizationString != null) {
+ httpUrlConnection.setRequestProperty("Authorization", authorizationString);
+ }
+ httpUrlConnection.setRequestMethod("GET");
+ httpUrlConnection.setRequestProperty("Accept", "application/json");
+ httpUrlConnection.setUseCaches(false);
+ httpUrlConnection.setConnectTimeout(connectTimeout);
+ httpUrlConnection.setReadTimeout(readTimeout);
+ return httpUrlConnection;
+ }
+
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/OofPciPocDmaapConsumers.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/OofPciPocDmaapConsumers.java
new file mode 100644
index 000000000..9ff6fd616
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/OofPciPocDmaapConsumers.java
@@ -0,0 +1,564 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.VelocityEngine;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class OofPciPocDmaapConsumers extends SdncDmaapConsumerImpl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OofPciPocDmaapConsumers.class);
+ private static final String SDNC_ENDPOINT = "SDNC.endpoint";
+ private static final String TEMPLATE = "SDNC.template";
+ private static final String DMAAPLISTENERROOT = "DMAAPLISTENERROOT";
+ private static final String UTF_8 = "UTF-8";
+
+ private static final String PARAMETER_NAME = "parameter-name";
+ private static final String STRING_VALUE = "string-value";
+ private static final String PHYSICAL_CELL_ID_INPUT_FAP_SERVICE = "configuration-phy-cell-id-input.fap-service";
+ private static final String EVENT_HEADER = "event-header";
+ private static final String ACTION = "Action";
+ private static final String CONFIGURATIONS = "Configurations";
+ private static final String MODIFY_CONFIG = "ModifyConfig";
+ private static final String DATA = "data";
+ private static final String FAP_SERVICE = "FAPService";
+
+ private static final String PAYLOAD = "Payload";
+ private static final String PCI_CHANGES_MAP_FILE_NAME = "pci-changes-from-policy-to-sdnr";
+ private static final String SLI_PARAMETERS = "sli_parameters";
+ private static final String RPC_NAME = "rpc-name";
+ private static final String BODY = "body";
+ private static final String INPUT = "input";
+ private static final String COMMON_HEADER = "CommonHeader";
+ private static final String TIME_STAMP = "TimeStamp";
+ private static final String REQUEST_ID = "RequestID";
+ private static final String SUB_REQUEST_ID = "SubRequestID";
+
+ private static final String TIME_STAMP_FOR_SLI = "timeStamp";
+ private static final String REQUEST_ID_FOR_SLI = "requestID";
+ private static final String SUB_REQUEST_ID_FOR_SLI = "subRequestID";
+
+ private static final String CONFIGURATION_PHY_CELL_ID_INPUT = "configuration-phy-cell-id-input.";
+
+ private static final String EMPTY = "";
+ private static final String ESCAPE_SEQUENCE_QUOTES = "\"";
+
+ private static final String GENERIC_NEIGHBOR_CONFIGURATION_INPUT = "generic-neighbor-configuration-input.";
+ private static final String GENERIC_NEIGHBOR_CONFIGURATION_INPUT_NEIGHBOR_LIST_IN_USE = GENERIC_NEIGHBOR_CONFIGURATION_INPUT.concat("neighbor-list-in-use");
+ private static final String MODIFY_CONFIG_ANR = "ModifyConfigANR";
+ private static final String ANR_CHANGES_MAP_FILE_NAME = "anr-changes-from-policy-to-sdnr";
+
+ private String rootDir;
+
+ protected VelocityEngine velocityEngine;
+
+ public OofPciPocDmaapConsumers() {
+ velocityEngine = new VelocityEngine();
+ Properties props = new Properties();
+ rootDir = System.getenv(DMAAPLISTENERROOT);
+
+ if ((rootDir == null) || (rootDir.length() == 0)) {
+ rootDir = "/opt/onap/sdnc/dmaap-listener/lib/";
+ }
+ else {
+ rootDir = rootDir + "/lib/";
+ }
+
+ props.put("file.resource.loader.path", rootDir);
+ velocityEngine.init(props);
+ }
+
+ /*
+ * for testing purposes
+ */
+ OofPciPocDmaapConsumers(Properties props) {
+ velocityEngine = new VelocityEngine();
+ velocityEngine.init(props);
+ }
+
+ protected String publish(String templatePath, String jsonString, JsonNode configurationsOrDataNode, boolean invokePciChangesPublish, boolean invokeAnrChangesPublish) throws IOException, InvalidMessageException
+ {
+ if (invokePciChangesPublish){
+ return publishPciChangesFromPolicyToSDNR(templatePath, configurationsOrDataNode, jsonString);
+ } else if (invokeAnrChangesPublish){
+ return publishANRChangesFromPolicyToSDNR(templatePath, configurationsOrDataNode, jsonString);
+ } else {
+ return publishFullMessage(templatePath, jsonString);
+ }
+ }
+
+ private String publishFullMessage(String templatePath, String jsonString) throws IOException
+ {
+ JSONObject jsonObj = new JSONObject(jsonString);
+ VelocityContext context = new VelocityContext();
+ for(Object key : jsonObj.keySet())
+ {
+ context.put((String)key, jsonObj.get((String)key));
+ }
+
+ String id = jsonObj.getJSONObject(EVENT_HEADER).get("id").toString();
+ context.put("req_id", id);
+
+ context.put("curr_time", Instant.now());
+
+ ObjectMapper oMapper = new ObjectMapper();
+
+ String rpcMsgbody = oMapper.writeValueAsString(jsonString);
+ context.put("full_message", rpcMsgbody);
+
+ Writer writer = new StringWriter();
+ velocityEngine.mergeTemplate(templatePath, UTF_8, context, writer);
+ writer.flush();
+
+ return writer.toString();
+ }
+
+ private String publishANRChangesFromPolicyToSDNR(String templatePath, JsonNode dataNode, String msg) throws IOException, InvalidMessageException
+ {
+ VelocityContext context = new VelocityContext();
+
+ String RPC_NAME_KEY_IN_VT = "rpc_name";
+ String RPC_NAME_VALUE_IN_VT = "generic-neighbor-configuration";
+
+ String CELL_CONFIG = "CellConfig";
+ String ALIAS_LABEL = "alias";
+ String LTE = "LTE";
+ String RAN = "RAN";
+ String LTE_CELL = "LTECell";
+ String NEIGHBOR_LIST_IN_USE = "NeighborListInUse";
+
+ JSONObject numberOfEntries = new JSONObject();
+ JSONObject alias = new JSONObject();
+ JSONArray sliParametersArray = new JSONArray();
+
+ ObjectMapper oMapper = new ObjectMapper();
+
+ JsonNode dmaapMessageRootNode;
+ try {
+ dmaapMessageRootNode = oMapper.readTree(msg);
+ } catch (Exception e) {
+ throw new InvalidMessageException("Cannot parse json object", e);
+ }
+
+ JsonNode commonHeader = dmaapMessageRootNode.get(BODY).get(INPUT).get(COMMON_HEADER);
+
+ JsonNode timeStamp = commonHeader.get(TIME_STAMP);
+
+ JsonNode requestID = commonHeader.get(REQUEST_ID);
+
+ JsonNode subRequestID = commonHeader.get(SUB_REQUEST_ID);
+
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, GENERIC_NEIGHBOR_CONFIGURATION_INPUT+TIME_STAMP_FOR_SLI).put(STRING_VALUE,timeStamp));
+
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, GENERIC_NEIGHBOR_CONFIGURATION_INPUT+REQUEST_ID_FOR_SLI).put(STRING_VALUE,requestID));
+
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, GENERIC_NEIGHBOR_CONFIGURATION_INPUT+SUB_REQUEST_ID_FOR_SLI).put(STRING_VALUE,subRequestID));
+
+ String aliasValue = dataNode.get(DATA).get(FAP_SERVICE).get(ALIAS_LABEL).textValue();
+
+ JsonNode nbrListInUse = dataNode.get(DATA).get(FAP_SERVICE).get(CELL_CONFIG).get(LTE).get(RAN).get(NEIGHBOR_LIST_IN_USE).get(LTE_CELL);
+
+ int entryCount = 0;
+
+ if(nbrListInUse.isArray()) {
+ for(JsonNode lteCell:nbrListInUse) {
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, GENERIC_NEIGHBOR_CONFIGURATION_INPUT_NEIGHBOR_LIST_IN_USE+"["+entryCount+"]."+"plmnid")
+ .put(STRING_VALUE, lteCell.get("PLMNID").toString().replace(ESCAPE_SEQUENCE_QUOTES, EMPTY)));
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, GENERIC_NEIGHBOR_CONFIGURATION_INPUT_NEIGHBOR_LIST_IN_USE+"["+entryCount+"]."+"cid")
+ .put(STRING_VALUE, lteCell.get("CID").toString().replace(ESCAPE_SEQUENCE_QUOTES, EMPTY)));
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, GENERIC_NEIGHBOR_CONFIGURATION_INPUT_NEIGHBOR_LIST_IN_USE+"["+entryCount+"]."+"phy-cell-id")
+ .put(STRING_VALUE, lteCell.get("PhyCellID").toString().replace(ESCAPE_SEQUENCE_QUOTES, EMPTY)));
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, GENERIC_NEIGHBOR_CONFIGURATION_INPUT_NEIGHBOR_LIST_IN_USE+"["+entryCount+"]."+"pnf-name")
+ .put(STRING_VALUE, lteCell.get("PNFName").toString().replace(ESCAPE_SEQUENCE_QUOTES, EMPTY)));
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, GENERIC_NEIGHBOR_CONFIGURATION_INPUT_NEIGHBOR_LIST_IN_USE+"["+entryCount+"]."+"blacklisted")
+ .put(STRING_VALUE, lteCell.get("Blacklisted").toString().replace(ESCAPE_SEQUENCE_QUOTES, EMPTY)));
+
+ entryCount++;
+ }
+
+ alias.put(PARAMETER_NAME, GENERIC_NEIGHBOR_CONFIGURATION_INPUT+ALIAS_LABEL);
+ alias.put(STRING_VALUE, aliasValue);
+
+ numberOfEntries.put(PARAMETER_NAME, GENERIC_NEIGHBOR_CONFIGURATION_INPUT+"lte-cell-number-of-entries");
+ numberOfEntries.put(STRING_VALUE, entryCount);
+
+ sliParametersArray.put(alias);
+ sliParametersArray.put(numberOfEntries);
+
+ context.put(SLI_PARAMETERS, sliParametersArray);
+
+ context.put(RPC_NAME_KEY_IN_VT, RPC_NAME_VALUE_IN_VT);
+
+ Writer writer = new StringWriter();
+ velocityEngine.mergeTemplate(templatePath, UTF_8, context, writer);
+ writer.flush();
+
+ return writer.toString();
+
+ }else {
+ throw new InvalidMessageException("nbrListInUse is not of Type Array. Could not read neighbor list elements");
+ }
+
+ }
+
+ private String publishPciChangesFromPolicyToSDNR(String templatePath, JsonNode configurationsJsonNode, String msg) throws IOException, InvalidMessageException
+ {
+ String RPC_NAME_KEY_IN_VT = "rpc_name";
+ String RPC_NAME_VALUE_IN_VT = "configuration-phy-cell-id";
+ String ALIAS = "alias";
+ String X0005b9Lte = "X0005b9Lte";
+
+ VelocityContext context = new VelocityContext();
+
+ JSONObject numberOfEntries = new JSONObject();
+ JSONArray sliParametersArray = new JSONArray();
+
+ JsonNode configurations = configurationsJsonNode.get(CONFIGURATIONS);
+
+ ObjectMapper oMapper = new ObjectMapper();
+
+ JsonNode dmaapMessageRootNode;
+ try {
+ dmaapMessageRootNode = oMapper.readTree(msg);
+ } catch (Exception e) {
+ throw new InvalidMessageException("Cannot parse json object", e);
+ }
+
+ JsonNode commonHeader = dmaapMessageRootNode.get(BODY).get(INPUT).get(COMMON_HEADER);
+
+ JsonNode timeStamp = commonHeader.get(TIME_STAMP);
+
+ JsonNode requestID = commonHeader.get(REQUEST_ID);
+
+ JsonNode subRequestID = commonHeader.get(SUB_REQUEST_ID);
+
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, CONFIGURATION_PHY_CELL_ID_INPUT+TIME_STAMP_FOR_SLI).put(STRING_VALUE,timeStamp));
+
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, CONFIGURATION_PHY_CELL_ID_INPUT+REQUEST_ID_FOR_SLI).put(STRING_VALUE,requestID));
+
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, CONFIGURATION_PHY_CELL_ID_INPUT+SUB_REQUEST_ID_FOR_SLI).put(STRING_VALUE,subRequestID));
+
+ int entryCount = 0;
+
+ if(configurations.isArray()) {
+ for(JsonNode dataNode:configurations) {
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, PHYSICAL_CELL_ID_INPUT_FAP_SERVICE+"["+entryCount+"]."+ALIAS)
+ .put(STRING_VALUE, dataNode.get(DATA).get(FAP_SERVICE).get(ALIAS).toString().replace(ESCAPE_SEQUENCE_QUOTES, EMPTY)));
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, PHYSICAL_CELL_ID_INPUT_FAP_SERVICE+"["+entryCount+"]."+"cid")
+ .put(STRING_VALUE, dataNode.get(DATA).get(FAP_SERVICE).get("CellConfig").get("LTE").get("RAN").get("Common").get("CellIdentity").toString().replace(ESCAPE_SEQUENCE_QUOTES, EMPTY)));
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, PHYSICAL_CELL_ID_INPUT_FAP_SERVICE+"["+entryCount+"]."+"phy-cell-id-in-use")
+ .put(STRING_VALUE, dataNode.get(DATA).get(FAP_SERVICE).get(X0005b9Lte).get("phyCellIdInUse").toString().replace(ESCAPE_SEQUENCE_QUOTES, EMPTY)));
+ sliParametersArray.put(new JSONObject().put(PARAMETER_NAME, PHYSICAL_CELL_ID_INPUT_FAP_SERVICE+"["+entryCount+"]."+"pnf-name")
+ .put(STRING_VALUE, dataNode.get(DATA).get(FAP_SERVICE).get(X0005b9Lte).get("pnfName").toString().replace(ESCAPE_SEQUENCE_QUOTES, EMPTY)));
+ entryCount++;
+ }
+
+ numberOfEntries.put(PARAMETER_NAME, PHYSICAL_CELL_ID_INPUT_FAP_SERVICE+"-number-of-entries");
+ numberOfEntries.put(STRING_VALUE, entryCount);
+
+ sliParametersArray.put(numberOfEntries);
+
+ context.put(SLI_PARAMETERS, sliParametersArray);
+
+ context.put(RPC_NAME_KEY_IN_VT, RPC_NAME_VALUE_IN_VT);
+
+ Writer writer = new StringWriter();
+ velocityEngine.mergeTemplate(templatePath, UTF_8, context, writer);
+ writer.flush();
+
+ return writer.toString();
+
+ }else {
+ throw new InvalidMessageException("Configurations is not of Type Array. Could not read configuration changes");
+ }
+
+ }
+
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException {
+
+ if (msg == null) {
+ throw new InvalidMessageException("Null message");
+ }
+
+ ObjectMapper oMapper = new ObjectMapper();
+ JsonNode dmaapMessageRootNode;
+ try {
+ dmaapMessageRootNode = oMapper.readTree(msg);
+ } catch (Exception e) {
+ throw new InvalidMessageException("Cannot parse json object", e);
+ }
+
+
+ JsonNode rpcnameNode = dmaapMessageRootNode.get(RPC_NAME);
+ if(rpcnameNode == null) {
+ LOG.info("Unable to identify the respective consumer to invoke. Please verify the dmaap message..");
+ return;
+ }
+
+ String rpcname = rpcnameNode.textValue();
+
+ if(!MODIFY_CONFIG.toLowerCase().equals(rpcname) && !MODIFY_CONFIG_ANR.toLowerCase().equals(rpcname)) {
+ LOG.info("Unknown rpc name {}", rpcname);
+ return;
+ }
+
+ if(MODIFY_CONFIG.toLowerCase().equals(rpcname)) {
+ invokePCIChangesConsumer(dmaapMessageRootNode, oMapper, msg);
+ return;
+ }
+
+ if(MODIFY_CONFIG_ANR.toLowerCase().equals(rpcname)) {
+ invokeANRChangesConsumer(dmaapMessageRootNode, oMapper, msg);
+ return;
+ }
+
+ }
+
+ private void invokeANRChangesConsumer(JsonNode dmaapMessageRootNode, ObjectMapper oMapper,
+ String msg) throws InvalidMessageException {
+ JsonNode body = dmaapMessageRootNode.get(BODY);
+ if(body == null) {
+ LOG.info("Missing body node.");
+ return;
+ }
+
+ JsonNode input = body.get(INPUT);
+ if(input == null) {
+ LOG.info("Missing input node.");
+ return;
+ }
+
+ JsonNode action = input.get(ACTION);
+ if(action == null) {
+ LOG.info("Missing action node.");
+ return;
+ }
+
+ if(!MODIFY_CONFIG_ANR.equals(action.textValue())) {
+ LOG.info("Unknown Action {}", action);
+ return;
+ }
+
+ JsonNode payload = input.get(PAYLOAD);
+ if(payload == null) {
+ LOG.info("Missing payload node.");
+ return;
+ }
+
+ String payloadText = payload.asText();
+
+ if(!payloadText.contains(CONFIGURATIONS)) {
+ LOG.info("Missing configurations node.");
+ return;
+ }
+
+ JsonNode configurationsJsonNode;
+ try {
+ configurationsJsonNode = oMapper.readTree(payloadText);
+ } catch (Exception e) {
+ throw new InvalidMessageException("Cannot parse payload value", e);
+ }
+
+ String mapFilename = rootDir + ANR_CHANGES_MAP_FILE_NAME + ".map";
+ Map<String, String> fieldMap = loadMap(mapFilename);
+ if (fieldMap == null) {
+ return;
+ }
+
+ if (!fieldMap.containsKey(SDNC_ENDPOINT)) {
+ return;
+ }
+ String sdncEndpoint = fieldMap.get(SDNC_ENDPOINT);
+
+ if (!fieldMap.containsKey(TEMPLATE)) {
+ throw new InvalidMessageException("No SDNC template known for message ");
+ }
+ String templateName = fieldMap.get(TEMPLATE);
+
+ JsonNode configurations = configurationsJsonNode.get(CONFIGURATIONS);
+
+ if(configurations.isArray()) {
+ for(JsonNode dataNode:configurations) {
+ if(dataNode.get(DATA).get(FAP_SERVICE) == null) {
+ LOG.info("Could not make a rpc call. Missing fapService node for dataNode element::", dataNode.textValue());
+ }else {
+ buildAndInvokeANRChangesRPC(sdncEndpoint, templateName,msg, dataNode);
+ }
+ }
+ }else {
+ throw new InvalidMessageException("Configurations is not of Type Array. Could not read configuration changes");
+ }
+ }
+
+ private void invokePCIChangesConsumer(JsonNode dmaapMessageRootNode, ObjectMapper oMapper,
+ String msg) throws InvalidMessageException {
+ JsonNode body = dmaapMessageRootNode.get(BODY);
+ if(body == null) {
+ LOG.info("Missing body node.");
+ return;
+ }
+
+ JsonNode input = body.get(INPUT);
+ if(input == null) {
+ LOG.info("Missing input node.");
+ return;
+ }
+
+ JsonNode action = input.get(ACTION);
+ if(action == null) {
+ LOG.info("Missing action node.");
+ return;
+ }
+
+
+ if(!MODIFY_CONFIG.equals(action.textValue())) {
+ LOG.info("Unknown Action {}", action);
+ return;
+ }
+
+ JsonNode payload = input.get(PAYLOAD);
+ if(payload == null) {
+ LOG.info("Missing payload node.");
+ return;
+ }
+
+ String configurations = payload.asText();
+
+ if(!configurations.contains(CONFIGURATIONS)) {
+ LOG.info("Missing configurations node.");
+ return;
+ }
+
+ JsonNode configurationsJsonNode;
+ try {
+ configurationsJsonNode = oMapper.readTree(configurations);
+ } catch (Exception e) {
+ throw new InvalidMessageException("Cannot parse payload value", e);
+ }
+
+ String mapFilename = rootDir + PCI_CHANGES_MAP_FILE_NAME + ".map";
+ Map<String, String> fieldMap = loadMap(mapFilename);
+ if (fieldMap == null) {
+ return;
+ }
+
+ if (!fieldMap.containsKey(SDNC_ENDPOINT)) {
+ return;
+ }
+ String sdncEndpoint = fieldMap.get(SDNC_ENDPOINT);
+
+ if (!fieldMap.containsKey(TEMPLATE)) {
+ throw new InvalidMessageException("No SDNC template known for message ");
+ }
+ String templateName = fieldMap.get(TEMPLATE);
+
+ buildAndInvokePCIChangesRPC(sdncEndpoint, templateName, msg, configurationsJsonNode);
+ }
+
+ private void buildAndInvokePCIChangesRPC(String sdncEndpoint, String templateName, String msg, JsonNode configurationsOrDataNode) {
+ try {
+ String rpcMsgbody = publish(templateName, msg, configurationsOrDataNode, true, false);
+ String odlUrlBase = getProperty("sdnc.odl.url-base");
+ String odlUser = getProperty("sdnc.odl.user");
+ String odlPassword = getProperty("sdnc.odl.password");
+
+ if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
+ SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + "/" + sdncEndpoint, odlUser, odlPassword);
+
+ conn.send("POST", "application/json", rpcMsgbody);
+ } else {
+ LOG.info("POST message body would be:\n" + rpcMsgbody);
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to process message", e);
+ }
+ }
+
+ private void buildAndInvokeANRChangesRPC(String sdncEndpoint, String templateName, String msg, JsonNode configurationsOrDataNode) {
+ try {
+ String rpcMsgbody = publish(templateName, msg, configurationsOrDataNode, false, true);
+ String odlUrlBase = getProperty("sdnc.odl.url-base");
+ String odlUser = getProperty("sdnc.odl.user");
+ String odlPassword = getProperty("sdnc.odl.password");
+
+ if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
+ SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + "/" + sdncEndpoint, odlUser, odlPassword);
+
+ conn.send("POST", "application/json", rpcMsgbody);
+ } else {
+ LOG.info("POST message body would be:\n" + rpcMsgbody);
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to process message", e);
+ }
+ }
+
+ private Map<String, String> loadMap(String mapFilename) {
+ File mapFile = new File(mapFilename);
+
+ if (!mapFile.canRead()) {
+ LOG.error(String.format("Cannot read map file (%s)", mapFilename));
+ return null;
+ }
+
+ Map<String, String> results = new HashMap<>();
+ try (BufferedReader mapReader = new BufferedReader(new FileReader(mapFile))) {
+
+ String curLine;
+
+ while ((curLine = mapReader.readLine()) != null) {
+ curLine = curLine.trim();
+
+ if ((curLine.length() > 0) && (!curLine.startsWith("#")) && curLine.contains("=>")) {
+ String[] entry = curLine.split("=>");
+ if (entry.length == 2) {
+ results.put(entry[0].trim(), entry[1].trim());
+ }
+ }
+ }
+ mapReader.close();
+ } catch (Exception e) {
+ LOG.error("Caught exception reading map " + mapFilename, e);
+ return null;
+ }
+
+ return results;
+ }
+
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java
new file mode 100644
index 000000000..f35e6f1e3
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java
@@ -0,0 +1,280 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * Modifications Copyright © 2018 IBM.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.VelocityEngine;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class SdncAaiDmaapConsumer extends SdncDmaapConsumerImpl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SdncAaiDmaapConsumer.class);
+ private static final String SDNC_ENDPOINT = "SDNC.endpoint";
+ private static final String TEMPLATE = "SDNC.template";
+ private static final String DMAAPLISTENERROOT = "DMAAPLISTENERROOT";
+
+ private static final String ESR_SYSTEM_INFO = "esr-system-info";
+ private static final String RELATIONSHIP_LIST = "relationship-list";
+ private static final String ESR_SYSTEM_INFO_LIST = "esr-system-info-list";
+ private static final String AAI_EVENT = "AAI-EVENT";
+
+ private static final String EVENT_TYPE = "event-type";
+ private static final String ENTITY = "entity";
+ private static final String ENTITY_TYPE = "entity-type";
+ private static final String EVENT_HEADER = "event-header";
+
+ private String rootDir;
+
+ protected VelocityEngine velocityEngine;
+
+ public SdncAaiDmaapConsumer() {
+ velocityEngine = new VelocityEngine();
+ Properties props = new Properties();
+ rootDir = System.getenv(DMAAPLISTENERROOT);
+
+ if ((rootDir == null) || (rootDir.length() == 0)) {
+ rootDir = "/opt/onap/sdnc/dmaap-listener/lib/";
+ }
+ else {
+ rootDir = rootDir + "/lib/";
+ }
+
+ props.put("file.resource.loader.path", rootDir);
+ velocityEngine.init(props);
+ }
+
+ /*
+ * for testing purposes
+ */
+ SdncAaiDmaapConsumer(Properties props) {
+ velocityEngine = new VelocityEngine();
+ velocityEngine.init(props);
+ }
+
+ protected String publish(String templatePath, String jsonString) throws IOException
+ {
+ if (templatePath.contains("esr-thirdparty-sdnc")){
+ return publishEsrThirdPartySdnc(templatePath, jsonString);
+ } else {
+ return publishFullMessage(templatePath, jsonString);
+ }
+ }
+
+ private String publishFullMessage(String templatePath, String jsonString) throws IOException
+ {
+ JSONObject jsonObj = new JSONObject(jsonString);
+ VelocityContext context = new VelocityContext();
+ for(Object key : jsonObj.keySet())
+ {
+ context.put((String)key, jsonObj.get((String)key));
+ }
+
+ String id = jsonObj.getJSONObject(EVENT_HEADER).get("id").toString();
+ context.put("req_id", id);
+
+ context.put("curr_time", Instant.now());
+
+ ObjectMapper oMapper = new ObjectMapper();
+
+ String rpcMsgbody = oMapper.writeValueAsString(jsonString);
+ context.put("full_message", rpcMsgbody);
+
+ Writer writer = new StringWriter();
+ velocityEngine.mergeTemplate(templatePath, "UTF-8", context, writer);
+ writer.flush();
+
+ return writer.toString();
+ }
+
+ private String publishEsrThirdPartySdnc(String templatePath, String jsonString) throws IOException
+ {
+ JSONObject jsonObj = new JSONObject(jsonString);
+ VelocityContext context = new VelocityContext();
+
+ JSONObject eventHeader = jsonObj.getJSONObject(EVENT_HEADER);
+ for(Object key : eventHeader.keySet())
+ {
+ if (!("action").equals(key)) {
+ context.put(((String)key).replaceAll("-", ""), eventHeader.get((String)key));
+ } else {
+ String action = (String) eventHeader.get((String) key);
+ if (("create").equalsIgnoreCase(action)) {
+ context.put((String)key,"Update");
+ } else if (("delete").equalsIgnoreCase(action)) {
+ context.put((String) key, "Delete");
+ } else {
+ throw new IOException("Action type not supported " + action);
+ }
+ }
+ }
+
+ JSONObject entityObj = jsonObj.getJSONObject(ENTITY);
+ for(Object key : entityObj.keySet())
+ {
+ switch((String)key)
+ {
+ case ESR_SYSTEM_INFO_LIST :
+ JSONArray esrSystemInfo = entityObj.getJSONObject((String)key)
+ .getJSONArray(ESR_SYSTEM_INFO);
+
+ for (int i = 0; i < esrSystemInfo.length(); i++) {
+ JSONObject objects = esrSystemInfo.getJSONObject(i);
+
+ for (Object name : objects.keySet()) {
+ context.put(((String)name).replaceAll("-", ""),
+ objects.get((String)name).toString());
+ }
+ }
+ break;
+
+ case RELATIONSHIP_LIST :
+ //convertion not required for relationship
+ break;
+
+ default :
+ context.put(((String)key).replaceAll("-", ""),
+ entityObj.get((String)key).toString());
+ break;
+ }
+ }
+
+ Writer writer = new StringWriter();
+ velocityEngine.mergeTemplate(templatePath, "UTF-8", context, writer);
+ writer.flush();
+
+ return writer.toString();
+ }
+
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException {
+
+ if (msg == null) {
+ throw new InvalidMessageException("Null message");
+ }
+
+ ObjectMapper oMapper = new ObjectMapper();
+ JsonNode aaiRootNode;
+ try {
+ aaiRootNode = oMapper.readTree(msg);
+ } catch (Exception e) {
+ throw new InvalidMessageException("Cannot parse json object", e);
+ }
+
+ JsonNode eventHeaderNode = aaiRootNode.get(EVENT_HEADER);
+ if(eventHeaderNode == null) {
+ LOG.info("Missing Event Header node.");
+ return;
+ }
+ JsonNode eventTypeNode = eventHeaderNode.get(EVENT_TYPE);
+ String eventType = eventTypeNode.textValue();
+
+ if(!AAI_EVENT.equals(eventType)) {
+ LOG.info("Unknown Event Type {}", eventType);
+ return;
+ }
+
+ JsonNode entityTypeNode = eventHeaderNode.get(ENTITY_TYPE);
+ String entityType = entityTypeNode.textValue();
+
+ String mapFilename = rootDir + entityType + ".map";
+ Map<String, String> fieldMap = loadMap(mapFilename);
+ if (fieldMap == null) {
+ return;
+ }
+
+ if (!fieldMap.containsKey(SDNC_ENDPOINT)) {
+ return;
+ }
+ String sdncEndpoint = fieldMap.get(SDNC_ENDPOINT);
+
+ if (!fieldMap.containsKey(TEMPLATE)) {
+ throw new InvalidMessageException("No SDNC template known for message " + entityType);
+ }
+ String templateName = fieldMap.get(TEMPLATE);
+
+ try {
+ String rpcMsgbody = publish(templateName, msg);
+ String odlUrlBase = getProperty("sdnc.odl.url-base");
+ String odlUser = getProperty("sdnc.odl.user");
+ String odlPassword = getProperty("sdnc.odl.password");
+
+ if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
+ SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + "/" + sdncEndpoint, odlUser, odlPassword);
+
+ conn.send("POST", "application/json", rpcMsgbody);
+ } else {
+ LOG.info("POST message body would be:\n" + rpcMsgbody);
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to process message", e);
+ }
+ }
+
+ private Map<String, String> loadMap(String mapFilename) {
+ File mapFile = new File(mapFilename);
+
+ if (!mapFile.canRead()) {
+ LOG.error(String.format("Cannot read map file (%s)", mapFilename));
+ return null;
+ }
+
+ Map<String, String> results = new HashMap<>();
+ try (BufferedReader mapReader = new BufferedReader(new FileReader(mapFile))) {
+
+ String curLine;
+
+ while ((curLine = mapReader.readLine()) != null) {
+ curLine = curLine.trim();
+
+ if ((curLine.length() > 0) && (!curLine.startsWith("#")) && curLine.contains("=>")) {
+ String[] entry = curLine.split("=>");
+ if (entry.length == 2) {
+ results.put(entry[0].trim(), entry[1].trim());
+ }
+ }
+ }
+ mapReader.close();
+ } catch (Exception e) {
+ LOG.error("Caught exception reading map " + mapFilename, e);
+ return null;
+ }
+
+ return results;
+ }
+
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java
new file mode 100644
index 000000000..7b68ceb63
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java
@@ -0,0 +1,130 @@
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Properties;
+import org.onap.ccsdk.sli.core.dblib.DBResourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class SdncDhcpEventConsumer extends SdncDmaapConsumerImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(SdncDhcpEventConsumer.class);
+
+ private static final String MAC_ADDR_TAG = "macaddr";
+ private static final String MSG_NAME_TAG = "msg_name";
+ private static final String IP_ADDR_TAG = "yiaddr";
+
+ private static DBResourceManager jdbcDataSource = null;
+ private static final String SDNC_CONFIG_DIR = "SDNC_CONFIG_DIR";
+
+ private class MissingDhcpAttributeException extends InvalidMessageException {
+
+ public MissingDhcpAttributeException(String fieldName) {
+ super("Invalid DHCP event - missing " + fieldName + " attribute");
+ }
+ }
+
+ private static void setJdbcDataSource() throws IOException {
+
+ String propPath;
+ String propDir = System.getenv(SDNC_CONFIG_DIR);
+ if (propDir == null) {
+ propDir = "/opt/onap/sdnc/data/properties";
+ }
+ propPath = propDir + "/dblib.properties";
+ File propFile = new File(propPath);
+
+ if (!propFile.exists()) {
+
+ throw new FileNotFoundException("Missing configuration properties file : " + propFile);
+ }
+
+ Properties props = new Properties();
+ props.load(new FileInputStream(propFile));
+
+ setJdbcDataSource(new DBResourceManager(props));
+
+ }
+
+ static void setJdbcDataSource(DBResourceManager dbMgr) {
+
+ jdbcDataSource = dbMgr;
+
+ if (jdbcDataSource.isActive()) {
+ LOG.warn("DBLIB: JDBC DataSource has been initialized.");
+ } else {
+ LOG.warn("DBLIB: JDBC DataSource did not initialize successfully.");
+ }
+ }
+
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException {
+ if (msg == null) {
+ throw new InvalidMessageException("Null message");
+ }
+
+ ObjectMapper oMapper = new ObjectMapper();
+
+ JsonNode dhcpRootNode;
+ String msgName;
+ String macAddr;
+ String ipAddr;
+
+ try {
+ dhcpRootNode = oMapper.readTree(msg);
+
+ } catch (IOException e) {
+ throw new InvalidMessageException("Cannot parse json object", e);
+ }
+
+ JsonNode msgNameNode = dhcpRootNode.get(MSG_NAME_TAG);
+ if (msgNameNode != null) {
+ msgName = msgNameNode.textValue();
+
+ } else {
+ throw new MissingDhcpAttributeException(MSG_NAME_TAG);
+ }
+
+ JsonNode macAddrNode = dhcpRootNode.get(MAC_ADDR_TAG);
+ if (macAddrNode != null) {
+ macAddr = macAddrNode.textValue();
+
+ } else {
+ throw new MissingDhcpAttributeException(MAC_ADDR_TAG);
+ }
+
+ JsonNode ipAddrNode = dhcpRootNode.get(IP_ADDR_TAG);
+ if (ipAddrNode != null) {
+ ipAddr = ipAddrNode.textValue();
+
+ } else {
+ throw new MissingDhcpAttributeException(IP_ADDR_TAG);
+ }
+
+ LOG.debug("Got DHCP event : msg name {}; mac addr {}; ip addr {}", msgName, macAddr, ipAddr);
+
+ if (jdbcDataSource == null) {
+ try {
+ setJdbcDataSource();
+ } catch (IOException e) {
+ LOG.error("Could not create JDBC connection", e);
+ return;
+ }
+ }
+
+ try {
+
+ jdbcDataSource.writeData("INSERT INTO DHCP_MAP(mac_addr, ip_addr) VALUES('" + macAddr + "','" + ipAddr + "') ON DUPLICATE KEY UPDATE ip_addr = '"+ipAddr+"'", null, null);
+
+ } catch (SQLException e) {
+ LOG.error("Could not insert DHCP event data into the database ", e);
+ }
+
+ }
+
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
new file mode 100644
index 000000000..3fc769d35
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.util.Properties;
+
+public abstract interface SdncDmaapConsumer extends Runnable {
+ public abstract void init(Properties baseProperties, String consumerPropertiesPath);
+
+ public abstract void processMsg(String msg) throws InvalidMessageException;
+
+ public abstract boolean isReady();
+
+ public abstract boolean isRunning();
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java
new file mode 100644
index 000000000..ee8bb4d6e
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java
@@ -0,0 +1,160 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * Modifications Copyright © 2018 IBM.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.response.MRConsumerResponse;
+
+public abstract class SdncDmaapConsumerImpl implements SdncDmaapConsumer {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SdncDmaapConsumer.class);
+
+ private final String name = this.getClass().getSimpleName();
+ private Properties properties = null;
+ private MRConsumer consumer = null;
+ private MRConsumerResponse consumerResponse = null;
+ private boolean running = false;
+ private boolean ready = false;
+ private int fetchPause = 5000; // Default pause between fetch - 5 seconds
+ private int timeout = 15000; // Default timeout - 15 seconds
+
+ public SdncDmaapConsumerImpl() {
+
+ }
+
+ public SdncDmaapConsumerImpl(Properties properties, String propertiesPath) {
+ init(properties, propertiesPath);
+ }
+
+ public boolean isReady() {
+ return ready;
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ public String getProperty(String name) {
+ return properties.getProperty(name, "");
+ }
+
+ public void init(Properties properties, String propertiesPath) {
+
+ try (FileInputStream in = new FileInputStream(new File(propertiesPath))) {
+
+ LOG.debug("propertiesPath: " + propertiesPath);
+ this.properties = (Properties) properties.clone();
+ this.properties.load(in);
+
+
+ String timeoutStr = this.properties.getProperty("timeout");
+ LOG.debug("timeoutStr: " + timeoutStr);
+
+ if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
+ timeout = parseTimeOutValue(timeoutStr);
+ }
+
+ String fetchPauseStr = this.properties.getProperty("fetchPause");
+ LOG.debug("fetchPause(Str): " + fetchPauseStr);
+ if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
+ fetchPause = parseFetchPause(fetchPauseStr);
+ }
+ LOG.debug("fetchPause: " + fetchPause);
+
+
+ this.consumer = MRClientFactory.createConsumer(propertiesPath);
+ ready = true;
+ } catch (Exception e) {
+ LOG.error("Error initializing DMaaP consumer from file " + propertiesPath, e);
+ }
+ }
+
+ private int parseTimeOutValue(String timeoutStr) {
+ try {
+ return Integer.parseInt(timeoutStr);
+ } catch (NumberFormatException e) {
+ LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
+ }
+ return timeout;
+ }
+
+ private int parseFetchPause(String fetchPauseStr) {
+ try {
+ return Integer.parseInt(fetchPauseStr);
+ } catch (NumberFormatException e) {
+ LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
+ }
+ return fetchPause;
+ }
+
+
+ @Override
+ public void run() {
+ if (ready) {
+
+ running = true;
+
+ while (running) {
+
+ try {
+ boolean noData = true;
+ consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
+ for (String msg : consumerResponse.getActualMessages()) {
+ noData = false;
+ LOG.info(name + " received ActualMessage from DMaaP:\n"+msg);
+ processMsg(msg);
+ }
+
+ if (noData) {
+ LOG.info(name + " received ResponseCode: " + consumerResponse.getResponseCode());
+ LOG.info(name + " received ResponseMessage: " + consumerResponse.getResponseMessage());
+ pauseThread();
+ }
+ } catch (Exception e) {
+ LOG.error("Caught exception reading from DMaaP", e);
+ running = false;
+ }
+
+
+ }
+ }
+ }
+
+ private void pauseThread() throws InterruptedException {
+ if (fetchPause > 0) {
+ LOG.info(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause));
+ Thread.sleep(fetchPause);
+ } else {
+ LOG.info("No data received from fetch. No fetch pause specified - retrying immediately");
+ }
+ }
+
+ public abstract void processMsg(String msg) throws InvalidMessageException;
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java
new file mode 100644
index 000000000..6c90c7199
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java
@@ -0,0 +1,170 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+
+public class SdncFlatJsonDmaapConsumer extends SdncDmaapConsumerImpl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SdncFlatJsonDmaapConsumer.class);
+
+ private static final String DMAAPLISTENERROOT = "DMAAPLISTENERROOT";
+ private static final String SDNC_ENDPOINT = "SDNC.endpoint";
+
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException {
+
+ processMsg(msg, null);
+ }
+
+ public void processMsg(String msg, String mapDirName) throws InvalidMessageException {
+
+ if (msg == null) {
+ throw new InvalidMessageException("Null message");
+ }
+
+ ObjectMapper oMapper = new ObjectMapper();
+ JsonNode instarRootNode;
+ ObjectNode sdncRootNode;
+
+ String instarMsgName = null;
+
+ try {
+ instarRootNode = oMapper.readTree(msg);
+ } catch (Exception e) {
+ throw new InvalidMessageException("Cannot parse json object", e);
+ }
+
+ Iterator<Map.Entry<String, JsonNode>> instarFields = instarRootNode.fields();
+
+ while (instarFields.hasNext()) {
+ Map.Entry<String, JsonNode> entry = instarFields.next();
+
+ instarMsgName = entry.getKey();
+ instarRootNode = entry.getValue();
+ break;
+ }
+
+ Map<String, String> fieldMap = loadMap(instarMsgName, mapDirName);
+
+ if (fieldMap == null) {
+ throw new InvalidMessageException("Unable to process message - cannot load field mappings");
+ }
+
+ if (!fieldMap.containsKey(SDNC_ENDPOINT)) {
+ throw new InvalidMessageException("No SDNC endpoint known for message " + instarMsgName);
+ }
+
+ String sdncEndpoint = fieldMap.get(SDNC_ENDPOINT);
+
+ sdncRootNode = oMapper.createObjectNode();
+ ObjectNode inputNode = oMapper.createObjectNode();
+
+ for (Map.Entry<String, String> entry : fieldMap.entrySet()) {
+
+ if (!SDNC_ENDPOINT.equals(entry.getKey())) {
+ JsonNode curNode = instarRootNode.get(entry.getKey());
+ if (curNode != null) {
+ String fromValue = curNode.textValue();
+
+ inputNode.put(entry.getValue(), fromValue);
+ }
+ }
+ }
+ sdncRootNode.put("input", inputNode);
+
+ try {
+ String rpcMsgbody = oMapper.writeValueAsString(sdncRootNode);
+ String odlUrlBase = getProperty("sdnc.odl.url-base");
+ String odlUser = getProperty("sdnc.odl.user");
+ String odlPassword = getProperty("sdnc.odl.password");
+
+ if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
+ SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + sdncEndpoint, odlUser, odlPassword);
+
+ conn.send("POST", "application/json", rpcMsgbody);
+ } else {
+ LOG.info("POST message body would be:\n" + rpcMsgbody);
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to process message", e);
+ }
+ }
+
+ private Map<String, String> loadMap(String msgType, String mapDirName) {
+ Map<String, String> results = new HashMap<>();
+
+ String dirName = mapDirName;
+
+ if (mapDirName == null) {
+ String rootdir = System.getenv(DMAAPLISTENERROOT);
+
+ if ((rootdir == null) || (rootdir.length() == 0)) {
+ rootdir = "/opt/app/dmaap-listener";
+ }
+
+ dirName = rootdir + "/lib";
+ }
+
+ String mapFilename = dirName + "/" + msgType + ".map";
+
+ File mapFile = new File(mapFilename);
+
+ if (!mapFile.canRead()) {
+ LOG.error(String.format("Cannot read map file (%s)", mapFilename));
+ return null;
+ }
+
+ try (BufferedReader mapReader = new BufferedReader(new FileReader(mapFile))) {
+
+ String curLine;
+
+ while ((curLine = mapReader.readLine()) != null) {
+ curLine = curLine.trim();
+
+ if ((curLine.length() > 0) && (!curLine.startsWith("#")) && curLine.contains("=>")) {
+ String[] entry = curLine.split("=>");
+ if (entry.length == 2) {
+ results.put(entry[0].trim(), entry[1].trim());
+ }
+ }
+ }
+ mapReader.close();
+ } catch (Exception e) {
+ LOG.error("Caught exception reading map " + mapFilename, e);
+ return null;
+ }
+
+ return results;
+ }
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java
new file mode 100644
index 000000000..f2153789f
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java
@@ -0,0 +1,93 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * Modifications Copyright © 2018 IBM.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SdncLcmDmaapConsumer extends SdncDmaapConsumerImpl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SdncLcmDmaapConsumer.class);
+
+ private static final String BODY = "body";
+ private static final String RPC = "rpc-name";
+
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException {
+
+ if (msg == null) {
+ throw new InvalidMessageException("Null LCM message");
+ }
+
+ ObjectMapper oMapper = new ObjectMapper();
+ JsonNode lcmRootNode;
+ try {
+ lcmRootNode = oMapper.readTree(msg);
+ } catch (Exception e) {
+ throw new InvalidMessageException("Cannot parse LCM json input", e);
+ }
+
+ JsonNode bodyNode = lcmRootNode.get(BODY);
+ if(bodyNode == null) {
+ LOG.warn("Missing body in LCM message");
+ return;
+ }
+ String rpcMsgbody;
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ rpcMsgbody = mapper.writeValueAsString(bodyNode);
+
+ } catch (Exception e) {
+ LOG.error("Unable to parse body in LCM message", e);
+ return;
+ }
+
+ JsonNode rpcNode = lcmRootNode.get(RPC);
+ if(rpcNode == null) {
+ LOG.warn("Missing node in LCM message- " + RPC);
+ return;
+ }
+ String rpc = rpcNode.textValue();
+ String sdncEndpoint = "LCM:" + rpc;
+
+ try {
+ String odlUrlBase = getProperty("sdnc.odl.url-base");
+ String odlUser = getProperty("sdnc.odl.user");
+ String odlPassword = getProperty("sdnc.odl.password");
+ LOG.info("POST LCM Request " + rpcMsgbody);
+ if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
+ SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + "/" + sdncEndpoint, odlUser, odlPassword);
+
+ conn.send("POST", "application/json", rpcMsgbody);
+ } else {
+ LOG.warn("Unable to POST LCM message. SDNC URL not available. body:\n" + rpcMsgbody);
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to process message", e);
+ }
+ }
+}
+
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncOdlConnection.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncOdlConnection.java
new file mode 100644
index 000000000..6eeef9b48
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncOdlConnection.java
@@ -0,0 +1,151 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.Authenticator;
+import java.net.HttpURLConnection;
+import java.net.PasswordAuthentication;
+import java.net.URL;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSession;
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SdncOdlConnection {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SdncOdlConnection.class);
+
+ private HttpURLConnection httpConn = null;
+
+ private String url = null;
+ private String user = null;
+ private String password = null;
+
+ private class SdncAuthenticator extends Authenticator {
+
+ private String user;
+ private String passwd;
+
+ SdncAuthenticator(String user, String passwd) {
+ this.user = user;
+ this.passwd = passwd;
+ }
+
+ @Override
+ protected PasswordAuthentication getPasswordAuthentication() {
+ return new PasswordAuthentication(user, passwd.toCharArray());
+ }
+ }
+
+ private SdncOdlConnection() {
+
+ }
+
+ private SdncOdlConnection(String url, String user, String password) {
+ this.url = url;
+ this.user = user;
+ this.password = password;
+
+ try {
+ URL sdncUrl = new URL(url);
+ Authenticator.setDefault(new SdncAuthenticator(user, password));
+
+ this.httpConn = (HttpURLConnection) sdncUrl.openConnection();
+ } catch (Exception e) {
+ LOG.error("Unable to create http connection", e);
+ }
+ }
+
+ public static SdncOdlConnection newInstance(String url, String user, String password) {
+ return new SdncOdlConnection(url, user, password);
+ }
+
+
+ public String send(String method, String contentType, String msg) throws IOException {
+
+ LOG.info(String.format("Sending REST %s to %s", method, url));
+ LOG.info(String.format("Message body:%n%s", msg));
+ String authStr = user + ":" + password;
+ String encodedAuthStr = new String(Base64.encodeBase64(authStr.getBytes()));
+
+ httpConn.addRequestProperty("Authentication", "Basic " + encodedAuthStr);
+
+ httpConn.setRequestMethod(method);
+ httpConn.setRequestProperty("Content-Type", contentType);
+ httpConn.setRequestProperty("Accept", contentType);
+
+ httpConn.setDoInput(true);
+ httpConn.setDoOutput(true);
+ httpConn.setUseCaches(false);
+
+ if (httpConn instanceof HttpsURLConnection) {
+ HostnameVerifier hostnameVerifier = new HostnameVerifier() {
+ @Override
+ public boolean verify(String hostname, SSLSession session) {
+ return true;
+ }
+ };
+ ((HttpsURLConnection) httpConn).setHostnameVerifier(hostnameVerifier);
+ }
+
+ // Write message
+ httpConn.setRequestProperty("Content-Length", Integer.toString(msg.length()));
+ DataOutputStream outStr = new DataOutputStream(httpConn.getOutputStream());
+ outStr.write(msg.getBytes());
+ outStr.close();
+
+ // Read response
+ BufferedReader respRdr;
+
+ LOG.info("Response: " + httpConn.getResponseCode() + " " + httpConn.getResponseMessage());
+
+ if (httpConn.getResponseCode() < 300) {
+
+ respRdr = new BufferedReader(new InputStreamReader(httpConn.getInputStream()));
+ } else {
+ respRdr = new BufferedReader(new InputStreamReader(httpConn.getErrorStream()));
+ }
+
+ StringBuilder respBuff = new StringBuilder();
+
+ String respLn;
+
+ while ((respLn = respRdr.readLine()) != null) {
+ respBuff.append(respLn).append("\n");
+ }
+ respRdr.close();
+
+ String respString = respBuff.toString();
+
+ LOG.info(String.format("Response body :%n%s", respString));
+
+ return respString;
+ }
+}
diff --git a/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncRANSliceDmaapConsumer.java b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncRANSliceDmaapConsumer.java
new file mode 100644
index 000000000..f1749f99b
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncRANSliceDmaapConsumer.java
@@ -0,0 +1,92 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * Modifications Copyright © 2018 IBM.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SdncRANSliceDmaapConsumer extends SdncDmaapConsumerImpl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SdncRANSliceDmaapConsumer.class);
+
+ private static final String BODY = "body";
+ private static final String RPC = "rpc-name";
+
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException {
+
+ if (msg == null) {
+ throw new InvalidMessageException("Null RANSlice message");
+ }
+
+ ObjectMapper oMapper = new ObjectMapper();
+ JsonNode ranSliceRootNode;
+ try {
+ ranSliceRootNode = oMapper.readTree(msg);
+ } catch (Exception e) {
+ throw new InvalidMessageException("Cannot parse RANSlice json input", e);
+ }
+
+ JsonNode bodyNode = ranSliceRootNode.get(BODY);
+ if(bodyNode == null) {
+ LOG.warn("Missing body in RANSlice message");
+ return;
+ }
+ String rpcMsgbody;
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ rpcMsgbody = mapper.writeValueAsString(bodyNode);
+
+ } catch (Exception e) {
+ LOG.error("Unable to parse body in RANSlice message", e);
+ return;
+ }
+
+ JsonNode rpcNode = ranSliceRootNode.get(RPC);
+ if(rpcNode == null) {
+ LOG.warn("Missing node in RANSlice message- " + RPC);
+ return;
+ }
+ String rpc = rpcNode.textValue();
+ String sdncEndpoint = "ran-slice-api:" + rpc;
+
+ try {
+ String odlUrlBase = getProperty("sdnc.odl.url-base");
+ String odlUser = getProperty("sdnc.odl.user");
+ String odlPassword = getProperty("sdnc.odl.password");
+ LOG.info("POST RANSlice Request " + rpcMsgbody);
+ if ((odlUrlBase != null) && (odlUrlBase.length() > 0)) {
+ SdncOdlConnection conn = SdncOdlConnection.newInstance(odlUrlBase + "/" + sdncEndpoint, odlUser, odlPassword);
+
+ conn.send("POST", "application/json", rpcMsgbody);
+ } else {
+ LOG.warn("Unable to POST RANSlice message. SDNC URL not available. body:\n" + rpcMsgbody);
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to process message", e);
+ }
+ }
+}
diff --git a/northbound/dmaap-listener/src/main/resources/anr-changes-from-policy-to-sdnr.map b/northbound/dmaap-listener/src/main/resources/anr-changes-from-policy-to-sdnr.map
new file mode 100644
index 000000000..5cd2b8371
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/resources/anr-changes-from-policy-to-sdnr.map
@@ -0,0 +1,5 @@
+# SDN-C URL
+SDNC.endpoint => SLI-API:execute-graph
+
+# Field mapping
+SDNC.template => anr-pci-changes-from-policy-to-sdnr.vt
diff --git a/northbound/dmaap-listener/src/main/resources/anr-pci-changes-from-policy-to-sdnr.vt b/northbound/dmaap-listener/src/main/resources/anr-pci-changes-from-policy-to-sdnr.vt
new file mode 100644
index 000000000..1ded88dc7
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/resources/anr-pci-changes-from-policy-to-sdnr.vt
@@ -0,0 +1,8 @@
+{
+ "input": {
+ "module-name": "oofpcipoc-api",
+ "rpc-name": "$rpc_name",
+ "mode": "sync",
+ "sli-parameter": $sli_parameters
+ }
+}
diff --git a/northbound/dmaap-listener/src/main/resources/edgeRouterStatusChange.map b/northbound/dmaap-listener/src/main/resources/edgeRouterStatusChange.map
new file mode 100644
index 000000000..fa5ff0784
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/resources/edgeRouterStatusChange.map
@@ -0,0 +1,44 @@
+# SDN-C URL
+SDNC.endpoint => FLOWRED-API:process-edge-router-status-change
+
+# Field mapping
+equip_id => equip_id
+ptnii_equip_name => equip-name
+equip_type.equip_type => equip-type
+ip_addr => loopback0
+router_prov_status => prov-status
+country.region => region
+country_abbr => country
+equip_name_code => equip-name-code
+as_number => as-number
+loopback1 => loopback1
+loopback2 => loopback2
+loopback3 => loopback3
+loopback40 => loopback40
+loopback65535 => loopback65535
+inms_list => inms-list
+encrypted_access_flag => encrypted-access-flag
+sw_name => sw-name
+nmipaddr => nm-addr
+function_code => function-code
+
+EquipmentId => equip-id
+PTNIIEquipmentName => equip-name
+EquipmentType => equip-type
+Loopback0 => loopback0
+RouterStatus => prov-status
+Region => region
+CountryAbbreviation => country
+EquipmentNameCode => equip-name-code
+ASNumber => as-number
+SoftwareName => sw-name
+NetworkManagementIPAddress => nm-addr
+FunctionCode => function-code
+Loopback1 => loopback1
+Loopback2 => loopback2
+Loopback3 => loopback3
+Loopback40 => loopback40
+Loopback65535 => loopback65535
+InmsList => inms-list
+EncryptedAccessFlag => encrypted-access-flag
+
diff --git a/northbound/dmaap-listener/src/main/resources/esr-thirdparty-sdnc.map b/northbound/dmaap-listener/src/main/resources/esr-thirdparty-sdnc.map
new file mode 100644
index 000000000..603645ebe
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/resources/esr-thirdparty-sdnc.map
@@ -0,0 +1,5 @@
+# SDN-C URL
+SDNC.endpoint => DataChange:data-change-notification
+
+# Field mapping
+SDNC.template => template-esr-thirdparty-sdnc.vt
diff --git a/northbound/dmaap-listener/src/main/resources/generic-vnf.map b/northbound/dmaap-listener/src/main/resources/generic-vnf.map
new file mode 100644
index 000000000..bc375eb28
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/resources/generic-vnf.map
@@ -0,0 +1,5 @@
+# SDN-C URL
+SDNC.endpoint => config-selfservice-api:notification-callback
+
+# Field mapping
+SDNC.template => template-generic-vnf.vt
diff --git a/northbound/dmaap-listener/src/main/resources/log4j2.xml b/northbound/dmaap-listener/src/main/resources/log4j2.xml
new file mode 100644
index 000000000..01ee93714
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/resources/log4j2.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<Configuration status="debug">
+ <Properties>
+ <Property name="logDir">$${env:LOGDIR:-logs}</Property>
+ </Properties>
+ <Appenders>
+ <RollingFile name="LOGFILE" fileName="${logDir}/dmaap-listener.log"
+ filePattern="${logDir}/dmaap-listener-%i.log">
+ <PatternLayout pattern="%p %d{yyyy-MM-dd HH:mm:ss.SSS Z} %c{1} - %m%n" />
+ <Policies>
+ <SizeBasedTriggeringPolicy size="10 MB" />
+ </Policies>
+ <DefaultRolloverStrategy max="10"/>
+ </RollingFile>
+ <Console name="CONSOLE" target="SYSTEM_OUT">
+ <PatternLayout pattern="%p %d{yyyy-MM-dd HH:mm:ss.SSS Z} %c{1} - %m%n" />
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="debug">
+ <AppenderRef ref="LOGFILE"/>
+ <AppenderRef ref="CONSOLE"/>
+ </Root>
+ </Loggers>
+</Configuration> \ No newline at end of file
diff --git a/northbound/dmaap-listener/src/main/resources/pci-changes-from-policy-to-sdnr.map b/northbound/dmaap-listener/src/main/resources/pci-changes-from-policy-to-sdnr.map
new file mode 100644
index 000000000..5cd2b8371
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/resources/pci-changes-from-policy-to-sdnr.map
@@ -0,0 +1,5 @@
+# SDN-C URL
+SDNC.endpoint => SLI-API:execute-graph
+
+# Field mapping
+SDNC.template => anr-pci-changes-from-policy-to-sdnr.vt
diff --git a/northbound/dmaap-listener/src/main/resources/preferredRoute.txt b/northbound/dmaap-listener/src/main/resources/preferredRoute.txt
new file mode 100644
index 000000000..662b0aa7d
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/resources/preferredRoute.txt
@@ -0,0 +1 @@
+preferredRouteKey=MR1 \ No newline at end of file
diff --git a/northbound/dmaap-listener/src/main/resources/pserver.map b/northbound/dmaap-listener/src/main/resources/pserver.map
new file mode 100644
index 000000000..8b3e463e7
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/resources/pserver.map
@@ -0,0 +1,5 @@
+# SDN-C URL
+SDNC.endpoint => config-selfservice-api:notification-callback
+
+# Field mapping
+SDNC.template => template-pserver.vt
diff --git a/northbound/dmaap-listener/src/main/resources/template-esr-thirdparty-sdnc.vt b/northbound/dmaap-listener/src/main/resources/template-esr-thirdparty-sdnc.vt
new file mode 100644
index 000000000..225fce026
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/resources/template-esr-thirdparty-sdnc.vt
@@ -0,0 +1,100 @@
+{
+ "DataChange:input": {
+ "DataChange:aai-node-type": "$entitytype",
+ "DataChange:selflink": "$entitylink",
+ "DataChange:aai-event-id": "$id",
+ "DataChange:aai-event-trigger": "$action",
+ "DataChange:key-data": [{
+ "DataChange:key-name": "thirdparty-sdnc-id",
+ "DataChange:key-value": "$thirdpartysdncid"
+ },
+ {
+ "DataChange:key-name": "resource-version",
+ "DataChange:key-value": "$resourceversion"
+ },
+ {
+ "DataChange:key-name": "location",
+ "DataChange:key-value": "$location"
+ },
+ {
+ "DataChange:key-name": "product-name",
+ "DataChange:key-value": "$productname"
+ },
+ {
+ "DataChange:key-name": "esr-system-info-id",
+ "DataChange:key-value": "$esrsysteminfoid"
+ },
+ {
+ "DataChange:key-name": "system-type",
+ "DataChange:key-value": "$systemtype"
+ },
+ {
+ "DataChange:key-name": "service-url",
+ "DataChange:key-value": "$serviceurl"
+ },
+ {
+ "DataChange:key-name": "ssl-cacert",
+ "DataChange:key-value": "$sslcacert"
+ },
+ {
+ "DataChange:key-name": "type",
+ "DataChange:key-value": "$type"
+ },
+ {
+ "DataChange:key-name": "ssl-insecure",
+ "DataChange:key-value": "$sslinsecure"
+ },
+ {
+ "DataChange:key-name": "system-status",
+ "DataChange:key-value": "$systemstatus"
+ },
+ {
+ "DataChange:key-name": "version",
+ "DataChange:key-value": "$version"
+ },
+ {
+ "DataChange:key-name": "passive",
+ "DataChange:key-value": "$passive"
+ },
+ {
+ "DataChange:key-name": "password",
+ "DataChange:key-value": "$password"
+ },
+ {
+ "DataChange:key-name": "protocol",
+ "DataChange:key-value": "$protocol"
+ },
+ {
+ "DataChange:key-name": "ip-address",
+ "DataChange:key-value": "$ipaddress"
+ },
+ {
+ "DataChange:key-name": "cloud-domain",
+ "DataChange:key-value": "$clouddomain"
+ },
+ {
+ "DataChange:key-name": "user-name",
+ "DataChange:key-value": "$username"
+ },
+ {
+ "DataChange:key-name": "system-name",
+ "DataChange:key-value": "$systemname"
+ },
+ {
+ "DataChange:key-name": "port",
+ "DataChange:key-value": "$port"
+ },
+ {
+ "DataChange:key-name": "vendor",
+ "DataChange:key-value": "$vendor"
+ },
+ {
+ "DataChange:key-name": "remote-path",
+ "DataChange:key-value": "$remotepath"
+ },
+ {
+ "DataChange:key-name": "default-tenant",
+ "DataChange:key-value": "$defaulttenant"
+ }]
+ }
+}
diff --git a/northbound/dmaap-listener/src/main/resources/template-generic-vnf.vt b/northbound/dmaap-listener/src/main/resources/template-generic-vnf.vt
new file mode 100644
index 000000000..d57c9a0a4
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/resources/template-generic-vnf.vt
@@ -0,0 +1,11 @@
+{
+ "input": {
+ "common-header": {
+ "request-id": "$req_id",
+ "timestamp": "$curr_time",
+ "originator-id": "AAI",
+ "api-ver": "2.00"
+ },
+ "payload": $full_message
+ }
+} \ No newline at end of file
diff --git a/northbound/dmaap-listener/src/main/resources/template-pserver.vt b/northbound/dmaap-listener/src/main/resources/template-pserver.vt
new file mode 100644
index 000000000..d57c9a0a4
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/resources/template-pserver.vt
@@ -0,0 +1,11 @@
+{
+ "input": {
+ "common-header": {
+ "request-id": "$req_id",
+ "timestamp": "$curr_time",
+ "originator-id": "AAI",
+ "api-ver": "2.00"
+ },
+ "payload": $full_message
+ }
+} \ No newline at end of file
diff --git a/northbound/dmaap-listener/src/main/scripts/start-dmaap-listener.sh b/northbound/dmaap-listener/src/main/scripts/start-dmaap-listener.sh
new file mode 100644
index 000000000..f767d3cd1
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/scripts/start-dmaap-listener.sh
@@ -0,0 +1,69 @@
+#!/bin/bash
+
+###
+# ============LICENSE_START=======================================================
+# openECOMP : SDN-C
+# ================================================================================
+# Copyright (C) 2017 AT&T Intellectual Property. All rights
+# reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+PROPERTY_DIR=${PROPERTY_DIR:-/opt/onap/ccsdk/data/properties}
+
+LISTENER=dmaap-listener
+
+
+
+PIDFILE=/tmp/.${LISTENER}-pid
+UEBLISTENERROOT=${UEBLISTENERROOT:-/opt/onap/dmaap-listener}
+JAVA_HOME=${JAVA_HOME:-/usr/lib/jvm/java-8-oracle}
+JAVA_OPTS=${JAVA_OPTS:--Dhttps.protocols=TLSv1.1,TLSv1.2}
+JAVA=${JAVA:-${JAVA_HOME}/bin/java}
+
+# Redirect output from script to $LISTENER.out
+exec >> ${UEBLISTENERROOT}/logs/$LISTENER.out
+exec 2>&1
+
+if [ -f $PIDFILE ]
+then
+ pid=$(cat $PIDFILE)
+ if [ "$pid" != "" ]
+ then
+ if kill -0 $pid
+ then
+ echo "$LISTENER already running"
+ exit 0
+ fi
+ fi
+fi
+
+if [ ! -d ${UEBLISTENERROOT}/logs ]
+then
+ mkdir ${UEBLISTENERROOT}/logs
+fi
+
+for file in ${UEBLISTENERROOT}/lib/*.jar
+do
+ LISTENERCLASSPATH=$LISTENERCLASSPATH:$file
+done
+
+${JAVA} ${JAVA_OPTS} -Dlog4j.configuration=file:${UEBLISTENERROOT}/lib/log4j.properties -cp ${LISTENERCLASSPATH} org.onap.ccsdk.sli.northbound.dmaapclient.DmaapListener &
+
+
+echo $! > $PIDFILE
+
+echo "$LISTENER started!"
+exit 0
diff --git a/northbound/dmaap-listener/src/main/scripts/stop-dmaap-listener.sh b/northbound/dmaap-listener/src/main/scripts/stop-dmaap-listener.sh
new file mode 100644
index 000000000..ab242044f
--- /dev/null
+++ b/northbound/dmaap-listener/src/main/scripts/stop-dmaap-listener.sh
@@ -0,0 +1,52 @@
+#!/bin/bash
+
+###
+# ============LICENSE_START=======================================================
+# openECOMP : SDN-C
+# ================================================================================
+# Copyright (C) 2017 AT&T Intellectual Property. All rights
+# reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+PROPERTY_DIR=${PROPERTY_DIR:-/opt/onap/ccsdk/data/properties}
+
+LISTENER=dmaap-listener
+
+
+PIDFILE=/tmp/.${LISTENER}-pid
+UEBLISTENERROOT=${UEBLISTENERROOT:-/opt/onap/dmaap-listener}
+
+if [ -f $PIDFILE ]
+then
+ pid=$(cat $PIDFILE)
+ if [ "$pid" != "" ]
+ then
+ if kill -0 $pid
+ then
+ echo "Stopping $LISTENER"
+ kill $pid && rm $PIDFILE
+ exit 0
+ else
+ echo "$LISTENER not running"
+ exit 1
+ fi
+ else
+ echo "$LISTENER not running"
+ exit 1
+ fi
+fi
+
+