summaryrefslogtreecommitdiffstats
path: root/sdnr/wt/mountpoint-registrar/provider
diff options
context:
space:
mode:
authorJakub Dominik <j.dominik@samsung.com>2021-10-14 15:12:55 +0200
committerDan Timoney <dtimoney@att.com>2021-11-18 13:19:44 +0000
commit98aeaac496ac868a97ce9096c1c51ce9a133992a (patch)
treee3e4a52f786f4dbe271485366f0a0c8d74edc85f /sdnr/wt/mountpoint-registrar/provider
parent5d3bfaac4bc7a8b7b030757e1f35795667915a2b (diff)
Extend SDNC persistent service to store CM
Extend SDNC persistent service to store received CM events into Elasticsearch and MariaDB Issue-ID: CCSDK-3497 Signed-off-by: Jakub Dominik <j.dominik@samsung.com> Change-Id: I39983e59ef6512ad6c3864d47aebe1d615897146 Signed-off-by: Michael DÜrre <michael.duerre@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider')
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java100
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java113
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java81
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java95
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java3
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java377
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMBasicHeaderFieldsNotification.java45
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationBuilder.java79
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationClient.java70
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPCMVESMsgConsumer.java157
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_invalid_type.json49
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_moi_attribute_value_changes.json45
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_moi_creation.json42
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_moi_deletion.json42
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_valid_two_element_moi_changes_array.json61
15 files changed, 1168 insertions, 191 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java
new file mode 100644
index 000000000..98f02ec7a
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+
+public class CMBasicHeaderFieldsNotification {
+ private String cmNodeId;
+ private String cmSequence;
+ private String cmOccurrenceTime;
+ private String sourceId;
+ private String notificationType;
+
+ public static CMBasicHeaderFieldsNotificationBuilder builder() {
+ return new CMBasicHeaderFieldsNotificationBuilder();
+ }
+
+ private CMBasicHeaderFieldsNotification(
+ CMBasicHeaderFieldsNotification.CMBasicHeaderFieldsNotificationBuilder builder) {
+ this.cmNodeId = builder.cmNodeId;
+ this.cmSequence = builder.cmSequence;
+ this.cmOccurrenceTime = builder.cmOccurrenceTime;
+ this.sourceId = builder.sourceId;
+ this.notificationType = builder.notificationType;
+ }
+
+ public static class CMBasicHeaderFieldsNotificationBuilder {
+ private String cmNodeId;
+ private String cmSequence;
+ private String cmOccurrenceTime;
+ private String sourceId;
+ private String notificationType;
+
+ public CMBasicHeaderFieldsNotification build() {
+ return new CMBasicHeaderFieldsNotification(this);
+ }
+
+ public CMBasicHeaderFieldsNotificationBuilder withCMNodeId(String cmNodeId) {
+ this.cmNodeId = cmNodeId;
+ return this;
+ }
+
+ public CMBasicHeaderFieldsNotificationBuilder withCMSequence(
+ String cmSequence) {
+ this.cmSequence = cmSequence;
+ return this;
+ }
+
+ public CMBasicHeaderFieldsNotificationBuilder withCMOccurrenceTime(
+ String cmOccurrenceTime) {
+ this.cmOccurrenceTime = cmOccurrenceTime;
+ return this;
+ }
+
+ public CMBasicHeaderFieldsNotificationBuilder withSourceId(String sourceId) {
+ this.sourceId = sourceId;
+ return this;
+ }
+
+ public CMBasicHeaderFieldsNotificationBuilder withNotificationType(
+ String notificationType) {
+ this.notificationType = notificationType;
+ return this;
+ }
+ }
+
+ public String getCmNodeId() {
+ return cmNodeId;
+ }
+
+ public String getCmSequence() {
+ return cmSequence;
+ }
+
+ public String getCmOccurrenceTime() {
+ return cmOccurrenceTime;
+ }
+
+ public String getNotificationType() {
+ return notificationType;
+ }
+
+ public String getSourceId() {
+ return sourceId;
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java
new file mode 100644
index 000000000..014ff648d
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java
@@ -0,0 +1,113 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.CmOperation;
+
+public class CMNotification {
+ private CMBasicHeaderFieldsNotification basicHeaderFields;
+ private String cmNotificationId;
+ private String cmSourceIndicator;
+ private String cmPath;
+ private String cmOperation;
+ private String cmValue;
+
+ public static CMNotificationBuilder builder() {
+ return new CMNotificationBuilder();
+ }
+
+ private CMNotification(CMNotificationBuilder builder) {
+ this.basicHeaderFields = builder.basicHeaderFields;
+ this.cmNotificationId = builder.cmNotificationId;
+ this.cmSourceIndicator = builder.cmSourceIndicator;
+ this.cmPath = builder.cmPath;
+ this.cmOperation = builder.cmOperation;
+ this.cmValue = builder.cmValue;
+ }
+
+ public static class CMNotificationBuilder {
+ private CMBasicHeaderFieldsNotification basicHeaderFields;
+ private String cmNotificationId;
+ private String cmSourceIndicator;
+ private String cmValue;
+ private String cmPath;
+
+ private String cmOperation = CmOperation.NULL.getName();
+
+
+ public CMNotification build() {
+ return new CMNotification(this);
+ }
+
+ public CMNotificationBuilder withCMBasicHeaderFieldsNotification(
+ CMBasicHeaderFieldsNotification basicHeaderFields) {
+ this.basicHeaderFields = basicHeaderFields;
+ return this;
+ }
+
+ public CMNotificationBuilder withCMNotificationId(
+ String cmNotificationId) {
+ this.cmNotificationId = cmNotificationId;
+ return this;
+ }
+
+ public CMNotificationBuilder withCMSourceIndicator(String cmSourceIndicator) {
+ this.cmSourceIndicator = cmSourceIndicator;
+ return this;
+ }
+
+ public CMNotificationBuilder withCMValue(String cmValue) {
+ this.cmValue = cmValue;
+ return this;
+ }
+
+ public CMNotificationBuilder withCMOperation(String cmOperation) {
+ this.cmOperation = cmOperation;
+ return this;
+ }
+
+ public CMNotificationBuilder withCMPath(String cmPath) {
+ this.cmPath = cmPath;
+ return this;
+ }
+ }
+
+ public CMBasicHeaderFieldsNotification getBasicHeaderFields() {
+ return basicHeaderFields;
+ }
+
+ public String getCmSourceIndicator() {
+ return cmSourceIndicator;
+ }
+
+ public String getCmPath() {
+ return cmPath;
+ }
+
+ public String getCmNotificationId() {
+ return cmNotificationId;
+ }
+
+ public String getCmOperation() {
+ return cmOperation;
+ }
+
+ public String getCmValue() {
+ return cmValue;
+ }
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java
new file mode 100644
index 000000000..b13d4ea38
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+
+import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.SendMethod.POST;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CMNotificationClient extends MessageClient {
+
+ private static final String CM_NOTIFICATION_URI = "restconf/operations/devicemanager:push-cm-notification";
+ public static final String NODE_ID = "@node-id@", COUNTER = "@counter@", TIMESTAMP = "@timestamp@",
+ OBJECT_ID = "@object-id@", NOTIFICATION_TYPE = "@notification-type@", SOURCE_INDICATOR = "@source-indicator@",
+ NOTIFICATION_ID = "@notification-id@", PATH = "@path@", OPERATION = "@operation@", VALUE = "@value@";
+ public static final List<String> REQUIRED_FIELDS =
+ List.of(NODE_ID, COUNTER, TIMESTAMP, OBJECT_ID, NOTIFICATION_TYPE, NOTIFICATION_ID, SOURCE_INDICATOR, PATH,
+ OPERATION, VALUE);
+
+ private static final String CM_PAYLOAD = "{\n"
+ + " \"devicemanager:input\": {\n"
+ + " \"devicemanager:node-id\": \"" + NODE_ID + "\",\n"
+ + " \"devicemanager:counter\": \"" + COUNTER + "\",\n"
+ + " \"devicemanager:timestamp\": \"" + TIMESTAMP + "\",\n"
+ + " \"devicemanager:object-id\": \"" + OBJECT_ID + "\",\n"
+ + " \"devicemanager:notification-type\": \"" + NOTIFICATION_TYPE + "\",\n"
+ + " \"devicemanager:notification-id\": \"" + NOTIFICATION_ID + "\",\n"
+ + " \"devicemanager:source-indicator\": \"" + SOURCE_INDICATOR + "\",\n"
+ + " \"devicemanager:path\": \"" + PATH + "\",\n"
+ + " \"devicemanager:operation\": \"" + OPERATION + "\",\n"
+ + " \"devicemanager:value\": \"" + VALUE + "\"\n"
+ + " }\n"
+ + "}";
+
+ public CMNotificationClient(String baseUrl) {
+ super(baseUrl, CM_NOTIFICATION_URI);
+ }
+
+ @Override
+ public String prepareMessageFromPayloadMap(Map<String, String> notificationPayloadMap) {
+ return super.prepareMessageFromPayloadMap(notificationPayloadMap, CM_PAYLOAD, REQUIRED_FIELDS);
+ }
+
+ @Override
+ public boolean sendNotification(String message) {
+ return super.sendNotification(message, POST, MessageType.json);
+ }
+
+
+ public static Map<String, String> createCMNotificationPayloadMap(CMNotification cmNotification) {
+ HashMap<String, String> map = new HashMap<>();
+ map.put(NODE_ID, cmNotification.getBasicHeaderFields().getCmNodeId());
+ map.put(COUNTER, cmNotification.getBasicHeaderFields().getCmSequence());
+ map.put(TIMESTAMP, cmNotification.getBasicHeaderFields().getCmOccurrenceTime());
+ map.put(OBJECT_ID, cmNotification.getBasicHeaderFields().getSourceId());
+ map.put(NOTIFICATION_TYPE, cmNotification.getBasicHeaderFields().getNotificationType());
+ map.put(NOTIFICATION_ID, cmNotification.getCmNotificationId());
+ map.put(SOURCE_INDICATOR, cmNotification.getCmSourceIndicator());
+ map.put(PATH, cmNotification.getCmPath());
+ map.put(OPERATION, cmNotification.getCmOperation());
+ map.put(VALUE, cmNotification.getCmValue());
+ return map;
+ }
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java
index 245807ec3..8412e3730 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java
@@ -19,6 +19,10 @@
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Iterator;
+import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,19 +39,96 @@ public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException {
LOG.debug("Processing CM message {}", msg);
JsonNode rootNode = convertMessageToJsonNode(msg);
- JsonNode dataNode;
- JsonNode notificationNode;
try {
- dataNode = rootNode.get("event").get("stndDefinedFields").get("data").requireNonNull();
- if(dataNode.get("notificationType").textValue().equalsIgnoreCase("notifyMOIChanges")) {
- notificationNode = dataNode.get("moiChanges");
- LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", dataNode.get("notificationId"));
+
+ String cmNodeId = rootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
+ String notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue();
+
+ if (notificationType.equalsIgnoreCase("notifyMOIChanges")) {
+ LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", cmNodeId);
+ processMoiChanges(rootNode);
+ } else if (notificationType.equalsIgnoreCase("notifyMOICreation")) {
+ LOG.info("Read CM message from DMaaP topic that is moiCreation type with id {}", cmNodeId);
+ sendCMNotification(preparePayloadMapFromMoi(rootNode, "/event/stndDefinedFields/data/attributeList"));
+ } else if (notificationType.equalsIgnoreCase("notifyMOIDeletion")) {
+ LOG.info("Read CM message from DMaaP topic that is moiDeletion type with id {}", cmNodeId);
+ sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList"));
+ } else if (notificationType.equalsIgnoreCase("notifyMOIAttributeValueChanges")) {
+ LOG.info("Read CM message from DMaaP topic that is moiAttributeValueChanges type with id {}", cmNodeId);
+ sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges"));
+ } else {
+ LOG.warn("Message is invalid, sending aborted, wrong CM notification type {}", notificationType);
+ throw new InvalidMessageException();
}
+
} catch (NullPointerException e) {
LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing");
throw new InvalidMessageException("Missing field");
}
- // take required data from notificationNode
}
+ private CMBasicHeaderFieldsNotification prepareCMCommonHeaderFields(JsonNode rootNode) {
+ return CMBasicHeaderFieldsNotification.builder()
+ .withCMNodeId(rootNode.at("/event/commonEventHeader/reportingEntityName").textValue())
+ .withCMSequence(rootNode.at("/event/commonEventHeader/sequence").toString())
+ .withCMOccurrenceTime(Instant
+ .ofEpochMilli(
+ rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
+ .atZone(ZoneId.of("Z")).toString())
+ .withSourceId(rootNode.at("/event/commonEventHeader/sourceId").textValue())
+ .withNotificationType(rootNode.at("/event/stndDefinedFields/data/notificationType").textValue())
+ .build();
+ }
+
+ private void processMoiChanges(JsonNode rootNode) {
+ Iterator<JsonNode> nodes = rootNode
+ .at("/event/stndDefinedFields/data/moiChanges")
+ .elements();
+ while (nodes.hasNext()) {
+ sendCMNotification(preparePayloadMapFromMoiChangesArray(rootNode, nodes));
+ }
+ }
+
+ public Map<String, String> preparePayloadMapFromMoiChangesArray(JsonNode rootNode, Iterator<JsonNode> nodes) {
+ JsonNode slaidNode = nodes.next();
+ return CMNotificationClient.createCMNotificationPayloadMap(
+ CMNotification.builder()
+ .withCMBasicHeaderFieldsNotification(
+ prepareCMCommonHeaderFields(rootNode))
+ .withCMNotificationId(slaidNode.get("notificationId").toString())
+ .withCMSourceIndicator(slaidNode.get("sourceIndicator").textValue())
+ .withCMPath(slaidNode.get("path").textValue())
+ .withCMOperation(slaidNode.get("operation").textValue())
+ .withCMValue(slaidNode.get("value").toString()
+ .replace("\"", ""))
+ .build());
+ }
+
+ public Map<String, String> preparePayloadMapFromMoi(JsonNode rootNode, String cmValueKey){
+ return CMNotificationClient.createCMNotificationPayloadMap(
+ CMNotification.builder()
+ .withCMBasicHeaderFieldsNotification(
+ prepareCMCommonHeaderFields(rootNode))
+ .withCMSourceIndicator(rootNode.at("/event/stndDefinedFields/data/sourceIndicator").textValue())
+ .withCMValue(rootNode.at(cmValueKey).toString()
+ .replace("\"", ""))
+ .build());
+ }
+
+ private void sendCMNotification(Map<String, String> payloadMapMessage) {
+ CMNotificationClient cmClient = setRESTConfAuthorization();
+ String message = cmClient.prepareMessageFromPayloadMap(payloadMapMessage);
+ cmClient.sendNotification(message);
+ }
+
+
+ private CMNotificationClient setRESTConfAuthorization() {
+ String sdnrUser = getSDNRUser();
+ String sdnrPasswd = getSDNRPasswd();
+
+ CMNotificationClient cmClient = new CMNotificationClient(getBaseUrl());
+ LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
+ cmClient.setAuthorization(sdnrUser, sdnrPasswd);
+ return cmClient;
+ }
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
index fff243893..34b8d4031 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
@@ -80,6 +80,9 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM
}
pauseThread();
}
+ } catch (InterruptedException e) {
+ LOG.warn("Caught exception reading from DMaaP VES Message Topic", e);
+ Thread.currentThread().interrupt();
} catch (JsonProcessingException jsonProcessingException) {
LOG.warn("Failed to convert message to JsonNode: {}", jsonProcessingException.getMessage());
} catch (InvalidMessageException invalidMessageException) {
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
index c694f1d2f..3626f534a 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
@@ -28,189 +28,198 @@ import org.slf4j.LoggerFactory;
public class DMaaPVESMsgConsumerMain implements Runnable {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
- private static final String _PNFREG_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
- private static final String _FAULT_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
- private static final String _CM_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer";
- private static final String _PNFREG_DOMAIN = "pnfRegistration";
- private static final String _FAULT_DOMAIN = "fault";
- private static final String _CM_DOMAIN = "provisioning";
-
- boolean threadsRunning = false;
- List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
- private PNFRegistrationConfig pnfRegistrationConfig;
- private FaultConfig faultConfig;
- private GeneralConfig generalConfig;
- private ProvisioningConfig provisioningConfig;
-
- public DMaaPVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
- this.generalConfig = generalConfig;
- configMap.forEach(this::initialize);
- }
-
- public void initialize(String domain, MessageConfig domainConfig) {
- LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
- String consumerClass;
- Properties consumerProperties = new Properties();
- if (domain.equalsIgnoreCase(_PNFREG_DOMAIN)) {
- this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
- consumerClass = _PNFREG_CLASS;
- LOG.debug("Consumer class = {}", consumerClass);
-
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
- pnfRegistrationConfig.getTransportType());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
- pnfRegistrationConfig.getHostPort());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
- pnfRegistrationConfig.getContenttype());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
- pnfRegistrationConfig.getConsumerGroup());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
- pnfRegistrationConfig.getConsumerId());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
- pnfRegistrationConfig.getTimeout());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
- pnfRegistrationConfig.getFetchPause());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL,
- pnfRegistrationConfig.getProtocol());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME,
- pnfRegistrationConfig.getUsername());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD,
- pnfRegistrationConfig.getPassword());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- pnfRegistrationConfig.getClientReadTimeout());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- pnfRegistrationConfig.getClientConnectTimeout());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- pnfRegistrationConfig.getHTTPProxyURI());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- pnfRegistrationConfig.getHTTPProxyUsername());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- pnfRegistrationConfig.getHTTPProxyPassword());
-
- threadsRunning = createConsumer(_PNFREG_DOMAIN, consumerProperties);
- } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) {
- this.faultConfig = (FaultConfig) domainConfig;
- consumerClass = _FAULT_CLASS;
- LOG.debug("Consumer class = {}", consumerClass);
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- faultConfig.getClientReadTimeout());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- faultConfig.getClientConnectTimeout());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- faultConfig.getHTTPProxyURI());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- faultConfig.getHTTPProxyUsername());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- faultConfig.getHTTPProxyPassword());
- threadsRunning = createConsumer(_FAULT_DOMAIN, consumerProperties);
- } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) {
- this.provisioningConfig = (ProvisioningConfig) domainConfig;
- consumerClass = _CM_CLASS;
- LOG.debug("Consumer class = {}", consumerClass);
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, provisioningConfig.getTransportType());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, provisioningConfig.getHostPort());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, provisioningConfig.getContenttype());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP, provisioningConfig.getConsumerGroup());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, provisioningConfig.getFetchPause());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, provisioningConfig.getProtocol());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_USERNAME, provisioningConfig.getUsername());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PASSWORD, provisioningConfig.getPassword());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- provisioningConfig.getClientReadTimeout());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- provisioningConfig.getClientConnectTimeout());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- provisioningConfig.getHTTPProxyURI());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- provisioningConfig.getHTTPProxyUsername());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- provisioningConfig.getHTTPProxyPassword());
- threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties);
- }
- }
-
- private boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
- boolean threadsRunning = false;
- for (DMaaPVESMsgConsumer consumer : consumers) {
- if (consumer.isRunning()) {
- threadsRunning = true;
- }
- }
- return threadsRunning;
- }
-
- public boolean createConsumer(String consumerType, Properties properties) {
- DMaaPVESMsgConsumerImpl consumer = null;
-
- if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
- consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig);
- else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
- consumer = new DMaaPFaultVESMsgConsumer(generalConfig);
- else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
- consumer = new DMaaPCMVESMsgConsumer(generalConfig);
-
- handleConsumer(consumer, properties, consumers);
- return !consumers.isEmpty();
- }
-
- private boolean handleConsumer(DMaaPVESMsgConsumer consumer, Properties properties,
- List<DMaaPVESMsgConsumer> consumers) {
- if (consumer != null) {
- consumer.init(properties);
-
- if (consumer.isReady()) {
- Thread consumerThread = new Thread(consumer);
- consumerThread.start();
- consumers.add(consumer);
-
- LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), properties);
- return true;
- } else {
- LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
- }
- }
- return false;
- }
-
- @Override
- public void run() {
- while (threadsRunning) {
- threadsRunning = updateThreadState(consumers);
- if (!threadsRunning) {
- break;
- }
-
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- LOG.error(e.getLocalizedMessage(), e);
- }
- }
-
- LOG.info("No listener threads running - exiting");
- }
-
- public List<DMaaPVESMsgConsumer> getConsumers() {
- return consumers;
- }
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
+ private static final String _PNFREG_CLASS =
+ "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
+ private static final String _FAULT_CLASS =
+ "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
+ private static final String _CM_CLASS =
+ "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer";
+ private static final String _PNFREG_DOMAIN = "pnfRegistration";
+ private static final String _FAULT_DOMAIN = "fault";
+ private static final String _CM_DOMAIN = "provisioning";
+
+ boolean threadsRunning = false;
+ List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
+ private PNFRegistrationConfig pnfRegistrationConfig;
+ private FaultConfig faultConfig;
+ private GeneralConfig generalConfig;
+ private ProvisioningConfig provisioningConfig;
+
+ public DMaaPVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
+ this.generalConfig = generalConfig;
+ configMap.forEach(this::initialize);
+ }
+
+ public void initialize(String domain, MessageConfig domainConfig) {
+ LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
+ String consumerClass;
+ Properties consumerProperties = new Properties();
+ if (domain.equalsIgnoreCase(_PNFREG_DOMAIN)) {
+ this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
+ consumerClass = _PNFREG_CLASS;
+ LOG.debug("Consumer class = {}", consumerClass);
+
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
+ pnfRegistrationConfig.getTransportType());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
+ pnfRegistrationConfig.getHostPort());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
+ pnfRegistrationConfig.getContenttype());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
+ pnfRegistrationConfig.getConsumerGroup());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
+ pnfRegistrationConfig.getConsumerId());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
+ pnfRegistrationConfig.getTimeout());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
+ pnfRegistrationConfig.getFetchPause());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL,
+ pnfRegistrationConfig.getProtocol());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME,
+ pnfRegistrationConfig.getUsername());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD,
+ pnfRegistrationConfig.getPassword());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
+ pnfRegistrationConfig.getClientReadTimeout());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
+ pnfRegistrationConfig.getClientConnectTimeout());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
+ pnfRegistrationConfig.getHTTPProxyURI());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
+ pnfRegistrationConfig.getHTTPProxyUsername());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
+ pnfRegistrationConfig.getHTTPProxyPassword());
+
+ threadsRunning = createConsumer(_PNFREG_DOMAIN, consumerProperties);
+ } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) {
+ this.faultConfig = (FaultConfig) domainConfig;
+ consumerClass = _FAULT_CLASS;
+ LOG.debug("Consumer class = {}", consumerClass);
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
+ faultConfig.getClientReadTimeout());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
+ faultConfig.getClientConnectTimeout());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
+ faultConfig.getHTTPProxyURI());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
+ faultConfig.getHTTPProxyUsername());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
+ faultConfig.getHTTPProxyPassword());
+ threadsRunning = createConsumer(_FAULT_DOMAIN, consumerProperties);
+ } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) {
+ this.provisioningConfig = (ProvisioningConfig) domainConfig;
+ consumerClass = _CM_CLASS;
+ LOG.debug("Consumer class = {}", consumerClass);
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
+ provisioningConfig.getTransportType());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
+ provisioningConfig.getHostPort());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
+ provisioningConfig.getContenttype());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP,
+ provisioningConfig.getConsumerGroup());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
+ provisioningConfig.getFetchPause());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, provisioningConfig.getProtocol());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_USERNAME, provisioningConfig.getUsername());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PASSWORD, provisioningConfig.getPassword());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
+ provisioningConfig.getClientReadTimeout());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
+ provisioningConfig.getClientConnectTimeout());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
+ provisioningConfig.getHTTPProxyURI());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
+ provisioningConfig.getHTTPProxyUsername());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
+ provisioningConfig.getHTTPProxyPassword());
+ threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties);
+ }
+ }
+
+ private boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
+ boolean threadsRunning = false;
+ for (DMaaPVESMsgConsumer consumer : consumers) {
+ if (consumer.isRunning()) {
+ threadsRunning = true;
+ }
+ }
+ return threadsRunning;
+ }
+
+ public boolean createConsumer(String consumerType, Properties properties) {
+ DMaaPVESMsgConsumerImpl consumer = null;
+
+ if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
+ consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig);
+ else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
+ consumer = new DMaaPFaultVESMsgConsumer(generalConfig);
+ else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
+ consumer = new DMaaPCMVESMsgConsumer(generalConfig);
+
+ handleConsumer(consumer, properties, consumers);
+ return !consumers.isEmpty();
+ }
+
+ private boolean handleConsumer(DMaaPVESMsgConsumer consumer, Properties properties,
+ List<DMaaPVESMsgConsumer> consumers) {
+ if (consumer != null) {
+ consumer.init(properties);
+
+ if (consumer.isReady()) {
+ Thread consumerThread = new Thread(consumer);
+ consumerThread.start();
+ consumers.add(consumer);
+
+ LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), properties);
+ return true;
+ } else {
+ LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void run() {
+ while (threadsRunning) {
+ threadsRunning = updateThreadState(consumers);
+ if (!threadsRunning) {
+ break;
+ }
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ LOG.error(e.getLocalizedMessage(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ LOG.info("No listener threads running - exiting");
+ }
+
+ public List<DMaaPVESMsgConsumer> getConsumers() {
+ return consumers;
+ }
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMBasicHeaderFieldsNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMBasicHeaderFieldsNotification.java
new file mode 100644
index 000000000..5446da048
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMBasicHeaderFieldsNotification.java
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.CMBasicHeaderFieldsNotification;
+
+public class TestCMBasicHeaderFieldsNotification {
+
+ private CMBasicHeaderFieldsNotification cmBasicFields;
+
+ @Test
+ public void testCMBasicFieldsBuilder() {
+ cmBasicFields = cmBasicFields.builder()
+ .withCMNodeId("test-node")
+ .withCMSequence("1")
+ .withCMOccurrenceTime("2021-10-18T15:25:19.948Z")
+ .withSourceId("src_device_id_1732")
+ .withNotificationType("notifyMOIChanges")
+ .build();
+
+ assertEquals("test-node", cmBasicFields.getCmNodeId());
+ assertEquals("1", cmBasicFields.getCmSequence());
+ assertEquals("src_device_id_1732", cmBasicFields.getSourceId());
+ assertEquals("2021-10-18T15:25:19.948Z", cmBasicFields.getCmOccurrenceTime());
+ assertEquals("notifyMOIChanges", cmBasicFields.getNotificationType());
+ }
+} \ No newline at end of file
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationBuilder.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationBuilder.java
new file mode 100644
index 000000000..3b74df321
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationBuilder.java
@@ -0,0 +1,79 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.CMBasicHeaderFieldsNotification;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.CMNotification;
+
+public class TestCMNotificationBuilder {
+
+ private CMNotification cmNotification;
+
+ @Test
+ public void testCMNotificationBuilderWithAllDefinedFields() {
+ cmNotification = cmNotification.builder()
+ .withCMBasicHeaderFieldsNotification(CMBasicHeaderFieldsNotification.builder()
+ .withCMNodeId("test-node")
+ .withCMSequence("1")
+ .withCMOccurrenceTime("2021-10-18T15:25:19.948Z")
+ .withSourceId("src_device_id_1732")
+ .withNotificationType("notifyMOIChanges")
+ .build())
+ .withCMNotificationId("123")
+ .withCMSourceIndicator("UNKNOWN")
+ .withCMPath("http://samsung.com/ves=1")
+ .withCMOperation("CREATE")
+ .withCMValue("value")
+ .build();
+
+ assertEquals("test-node", cmNotification.getBasicHeaderFields().getCmNodeId());
+ assertEquals("1", cmNotification.getBasicHeaderFields().getCmSequence());
+ assertEquals("src_device_id_1732", cmNotification.getBasicHeaderFields().getSourceId());
+ assertEquals("2021-10-18T15:25:19.948Z", cmNotification.getBasicHeaderFields().getCmOccurrenceTime());
+ assertEquals("notifyMOIChanges", cmNotification.getBasicHeaderFields().getNotificationType());
+ assertEquals("123", cmNotification.getCmNotificationId());
+ assertEquals("UNKNOWN", cmNotification.getCmSourceIndicator());
+ assertEquals("http://samsung.com/ves=1", cmNotification.getCmPath());
+ assertEquals("CREATE", cmNotification.getCmOperation());
+ assertEquals("value", cmNotification.getCmValue());
+ }
+
+ @Test
+ public void testCMNotificationBuilderWithDefaultCMOperation() {
+ cmNotification = cmNotification.builder()
+ .withCMBasicHeaderFieldsNotification(CMBasicHeaderFieldsNotification.builder()
+ .withCMNodeId("test-node")
+ .withCMSequence("1")
+ .withCMOccurrenceTime("2021-10-18T15:25:19.948Z")
+ .withSourceId("src_device_id_1732")
+ .withNotificationType("notifyMOIChanges")
+ .build())
+ .withCMNotificationId("123")
+ .withCMSourceIndicator("UNKNOWN")
+ .withCMPath("http://samsung.com/ves=1")
+ .build();
+
+ assertEquals("test-node", cmNotification.getBasicHeaderFields().getCmNodeId());
+ assertEquals("NULL", cmNotification.getCmOperation());
+ assertNull(cmNotification.getCmValue());
+ }
+} \ No newline at end of file
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationClient.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationClient.java
new file mode 100644
index 000000000..12ccd4c62
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationClient.java
@@ -0,0 +1,70 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.junit.Test;
+import org.onap.ccsdk.features.sdnr.wt.common.http.BaseHTTPResponse;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.CMBasicHeaderFieldsNotification;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.CMNotification;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.CMNotificationClient;
+
+public class TestCMNotificationClient extends CMNotificationClient {
+ public static String baseUrl = "http://localhost:8181";
+ CMNotificationClient testClient;
+
+ public TestCMNotificationClient() {
+ super(baseUrl);
+ }
+
+ @Test
+ public void testCMNotificationClient() {
+ testClient = new TestCMNotificationClient();
+ testClient.setAuthorization("admin", "admin");
+
+ String msg = testClient.prepareMessageFromPayloadMap(
+ CMNotificationClient.createCMNotificationPayloadMap(
+ CMNotification.builder()
+ .withCMBasicHeaderFieldsNotification(CMBasicHeaderFieldsNotification.builder()
+ .withCMNodeId("test-node")
+ .withCMSequence("1")
+ .withCMOccurrenceTime("2021-10-18T15:25:19.948Z")
+ .withSourceId("src_device_id_1732")
+ .withNotificationType("notifyMOIChanges")
+ .build())
+ .withCMNotificationId("123")
+ .withCMSourceIndicator("UNKNOWN")
+ .withCMPath("http://samsung.com/ves=1")
+ .withCMOperation("CREATE")
+ .withCMValue("value")
+ .build()
+ ));
+ assertTrue(testClient.sendNotification(msg));
+ }
+
+ @Override
+ @Nonnull
+ public BaseHTTPResponse sendRequest(String uri, String method, String body, Map<String, String> headers) {
+ System.out.println("In overridden sendRequest in TestCMNotificationClient");
+ return new BaseHTTPResponse(200, body);
+ }
+} \ No newline at end of file
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPCMVESMsgConsumer.java
index 0cd7f0228..2c4fb647b 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPCMVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPCMVESMsgConsumer.java
@@ -30,7 +30,13 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import java.util.Iterator;
+import java.util.Map;
public class TestDMaaPCMVESMsgConsumer {
@@ -62,6 +68,14 @@ public class TestDMaaPCMVESMsgConsumer {
dMaaPCMVESMsgConsumer.processMsg(cmEvent);
}
+ @Test(expected = InvalidMessageException.class)
+ public void processMsgThatHasInvalidNotificationType()
+ throws URISyntaxException, IOException, InvalidMessageException {
+ File cmFileInvalid = new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_invalid_type.json").toURI());
+ String cmEvent = readFileToString(cmFileInvalid);
+ dMaaPCMVESMsgConsumer.processMsg(cmEvent);
+ }
+
@Test(expected = JsonProcessingException.class)
public void processMsgThatIsNotValidJson() throws URISyntaxException, IOException, InvalidMessageException {
File cmFileInvalid = new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/not_a_json.json").toURI());
@@ -69,12 +83,155 @@ public class TestDMaaPCMVESMsgConsumer {
dMaaPCMVESMsgConsumer.processMsg(cmEvent);
}
+ @Test
+ public void processMsgWithOneElementMoiChangesArray() throws URISyntaxException, IOException {
+ File cmFileValid = new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_valid.json").toURI());
+ String cmEvent = readFileToString(cmFileValid);
+ try {
+ JsonNode rootNode = convertMessageToJsonNode(cmEvent);
+ Iterator<JsonNode> nodes = rootNode
+ .at("/event/stndDefinedFields/data/moiChanges")
+ .elements();
+ Map<String, String> payloadMap =
+ dMaaPCMVESMsgConsumer.preparePayloadMapFromMoiChangesArray(rootNode, nodes);
+
+ assertEquals("samsung-O-DU-1122", payloadMap.get("@node-id@"));
+ assertEquals("0", payloadMap.get("@counter@"));
+ assertEquals("2019-01-09T12:30:07.722Z", payloadMap.get("@timestamp@"));
+ assertEquals("src_device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f", payloadMap.get("@object-id@"));
+ assertEquals("notifyMOIChanges", payloadMap.get("@notification-type@"));
+ assertEquals("123", payloadMap.get("@notification-id@"));
+ assertEquals("MANAGEMENT_OPERATION", payloadMap.get("@source-indicator@"));
+ assertEquals("https://samsung.com/3GPP/simulation/network-function/ves=1", payloadMap.get("@path@"));
+ assertEquals("REPLACE", payloadMap.get("@operation@"));
+ assertEquals("{pnf-registration:true,faults-enabled:true}", payloadMap.get("@value@"));
+
+ } catch (Exception e) {
+ fail("Test fail with message: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void processMsgWithTwoElementMoiChangesArray() throws URISyntaxException, IOException {
+ File cmFileValid =
+ new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_valid_two_element_moi_changes_array.json")
+ .toURI());
+ String cmEvent = readFileToString(cmFileValid);
+ try {
+ JsonNode rootNode = convertMessageToJsonNode(cmEvent);
+ Iterator<JsonNode> nodes = rootNode
+ .at("/event/stndDefinedFields/data/moiChanges")
+ .elements();
+ Map<String, String> payloadMap =
+ dMaaPCMVESMsgConsumer.preparePayloadMapFromMoiChangesArray(rootNode, nodes);
+
+ assertEquals("samsung-O-DU-1122", payloadMap.get("@node-id@"));
+ assertEquals("0", payloadMap.get("@counter@"));
+ assertEquals("2019-01-09T12:30:07.722Z", payloadMap.get("@timestamp@"));
+ assertEquals("src_device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f", payloadMap.get("@object-id@"));
+ assertEquals("notifyMOIChanges", payloadMap.get("@notification-type@"));
+ assertEquals("123", payloadMap.get("@notification-id@"));
+ assertEquals("MANAGEMENT_OPERATION", payloadMap.get("@source-indicator@"));
+ assertEquals("https://samsung.com/3GPP/simulation/network-function/ves=1", payloadMap.get("@path@"));
+ assertEquals("REPLACE", payloadMap.get("@operation@"));
+ assertEquals("{pnf-registration:true,faults-enabled:true}", payloadMap.get("@value@"));
+
+ Map<String, String> payloadMap2 = null;
+ while (nodes.hasNext()) {
+ payloadMap2 = dMaaPCMVESMsgConsumer.preparePayloadMapFromMoiChangesArray(rootNode, nodes);
+ }
+ assertEquals("samsung-O-DU-1122", payloadMap2.get("@node-id@"));
+ assertEquals("124", payloadMap2.get("@notification-id@"));
+ assertEquals("RESOURCE_OPERATION", payloadMap2.get("@source-indicator@"));
+ assertEquals("https://samsung.com/3GPP/simulation/network-function/ves=2", payloadMap2.get("@path@"));
+ assertEquals("CREATE", payloadMap2.get("@operation@"));
+ assertEquals("{pnf-registration:false,faults-enabled:false}", payloadMap2.get("@value@"));
+
+ } catch (Exception e) {
+ fail("Test fail with message: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void processMsgNotifyMoiCreationType() throws URISyntaxException, IOException {
+ File cmFileValid = new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_moi_creation.json").toURI());
+ String cmEvent = readFileToString(cmFileValid);
+ try {
+ JsonNode rootNode = convertMessageToJsonNode(cmEvent);
+ Map<String, String> payloadMap = dMaaPCMVESMsgConsumer.preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList");
+ assertEquals("samsung-O-DU-1122", payloadMap.get("@node-id@"));
+ assertEquals("0", payloadMap.get("@counter@"));
+ assertEquals("2019-01-09T12:30:07.722Z", payloadMap.get("@timestamp@"));
+ assertEquals("src_device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f", payloadMap.get("@object-id@"));
+ assertEquals("notifyMOICreation", payloadMap.get("@notification-type@"));
+ assertNull(payloadMap.get("@notification-id@"));
+ assertEquals("MANAGEMENT_OPERATION", payloadMap.get("@source-indicator@"));
+ assertNull(payloadMap.get("@path@"));
+ assertEquals("NULL", payloadMap.get("@operation@"));
+ assertEquals("{pnf-registration:true,faults-enabled:true}", payloadMap.get("@value@"));
+
+ } catch (Exception e) {
+ fail("Test fail with message: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void processMsgNotifyMoiDeletionType() throws URISyntaxException, IOException {
+ File cmFileValid = new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_moi_deletion.json").toURI());
+ String cmEvent = readFileToString(cmFileValid);
+ try {
+ JsonNode rootNode = convertMessageToJsonNode(cmEvent);
+ Map<String, String> payloadMap = dMaaPCMVESMsgConsumer.preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList");
+ assertEquals("samsung-O-DU-1122", payloadMap.get("@node-id@"));
+ assertEquals("0", payloadMap.get("@counter@"));
+ assertEquals("2019-01-09T12:30:07.722Z", payloadMap.get("@timestamp@"));
+ assertEquals("src_device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f", payloadMap.get("@object-id@"));
+ assertEquals("notifyMOIDeletion", payloadMap.get("@notification-type@"));
+ assertNull(payloadMap.get("@notification-id@"));
+ assertEquals("MANAGEMENT_OPERATION", payloadMap.get("@source-indicator@"));
+ assertNull(payloadMap.get("@path@"));
+ assertEquals("NULL", payloadMap.get("@operation@"));
+ assertEquals("{pnf-registration:true,faults-enabled:true}", payloadMap.get("@value@"));
+
+ } catch (Exception e) {
+ fail("Test fail with message: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void processMsgNotifyMoiAttributeValueChangesType() throws URISyntaxException, IOException {
+ File cmFileValid =
+ new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_moi_attribute_value_changes.json").toURI());
+ String cmEvent = readFileToString(cmFileValid);
+ try {
+ JsonNode rootNode = convertMessageToJsonNode(cmEvent);
+ Map<String, String> payloadMap = dMaaPCMVESMsgConsumer.preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges");
+ assertEquals("samsung-O-DU-1122", payloadMap.get("@node-id@"));
+ assertEquals("0", payloadMap.get("@counter@"));
+ assertEquals("2019-01-09T12:30:07.722Z", payloadMap.get("@timestamp@"));
+ assertEquals("src_device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f", payloadMap.get("@object-id@"));
+ assertEquals("notifyMOIAttributeValueChanges", payloadMap.get("@notification-type@"));
+ assertNull(payloadMap.get("@notification-id@"));
+ assertEquals("UNKNOWN", payloadMap.get("@source-indicator@"));
+ assertNull(payloadMap.get("@path@"));
+ assertEquals("NULL", payloadMap.get("@operation@"));
+ assertEquals("[{attributeNameValuePairSet:{faults-enabled:true}}]", payloadMap.get("@value@"));
+
+ } catch (Exception e) {
+ fail("Test fail with message: " + e.getMessage());
+ }
+ }
+
private String readFileToString(File file) throws IOException {
StringBuilder fileContent = new StringBuilder();
Files.lines(Paths.get(file.toURI())).forEach(fileContent::append);
return fileContent.toString();
}
+ private JsonNode convertMessageToJsonNode(String message) throws JsonProcessingException {
+ return new ObjectMapper().readTree(message);
+ }
+
@After
public void after() {
generalConfigForTest.close();
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_invalid_type.json b/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_invalid_type.json
new file mode 100644
index 000000000..ded304032
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_invalid_type.json
@@ -0,0 +1,49 @@
+{
+ "event": {
+ "commonEventHeader": {
+ "version": "4.1",
+ "vesEventListenerVersion": "7.2",
+ "domain": "stndDefined",
+ "stndDefinedNamespace": "3GPP-Provisioning",
+ "eventId": "cm0004012",
+ "eventName": "ves_stdnDefined_3GPP-Provisioning",
+ "nfNamingCode": "NFNC",
+ "nfVendorName": "POC",
+ "nfcNamingCode": "NFC",
+ "priority": "Medium",
+ "reportingEntityId": "device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f",
+ "reportingEntityName": "samsung-O-DU-1122",
+ "sequence": 0,
+ "sourceId": "src_device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f",
+ "sourceName": "samsung-O-DU-1122",
+ "startEpochMicrosec": 1547037007722752,
+ "lastEpochMicrosec": 1547037028498530,
+ "timeZoneOffset": "UTC-05:30"
+ },
+ "stndDefinedFields": {
+ "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/Rel16/OpenAPI/provMnS.yaml#/components/schemas/NotifyMoiChanges",
+ "data": {
+ "href": "href1",
+ "notificationId": 1,
+ "notificationType": "type",
+ "eventTime": "2021-08-23T11:52:10.6Z",
+ "systemDN": "xyz",
+ "moiChanges": [
+ {
+ "notificationId": 123,
+ "correlatedNotifications": [],
+ "additionalText": "AdditionalTextDetails",
+ "sourceIndicator": "MANAGEMENT_OPERATION",
+ "path":"https://samsung.com/3GPP/simulation/network-function/ves=1",
+ "operation": "REPLACE",
+ "value": {
+ "pnf-registration": "true",
+ "faults-enabled": "true"
+ }
+ }
+ ]
+ },
+ "stndDefinedFieldsVersion": "1.0"
+ }
+ }
+} \ No newline at end of file
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_moi_attribute_value_changes.json b/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_moi_attribute_value_changes.json
new file mode 100644
index 000000000..088bdd3e5
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_moi_attribute_value_changes.json
@@ -0,0 +1,45 @@
+{
+ "event": {
+ "commonEventHeader": {
+ "version": "4.1",
+ "vesEventListenerVersion": "7.2",
+ "domain": "stndDefined",
+ "stndDefinedNamespace": "3GPP-Provisioning",
+ "eventId": "cm0004012",
+ "eventName": "ves_stdnDefined_3GPP-Provisioning",
+ "nfNamingCode": "NFNC",
+ "nfVendorName": "POC",
+ "nfcNamingCode": "NFC",
+ "priority": "Medium",
+ "reportingEntityId": "device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f",
+ "reportingEntityName": "samsung-O-DU-1122",
+ "sequence": 0,
+ "sourceId": "src_device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f",
+ "sourceName": "samsung-O-DU-1122",
+ "startEpochMicrosec": 1547037007722752,
+ "lastEpochMicrosec": 1547037028498530,
+ "timeZoneOffset": "UTC-05:30"
+ },
+ "stndDefinedFields": {
+ "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/Rel16/OpenAPI/provMnS.yaml#/components/schemas/NotifyMoiAttributeValueChanges",
+ "data": {
+ "href": "href1",
+ "notificationId": 1,
+ "notificationType": "notifyMOIAttributeValueChanges",
+ "eventTime": "2021-08-23T11:52:10.6Z",
+ "systemDN": "xyz",
+ "correlatedNotifications": [],
+ "additionalText": "AdditionalTextDetails",
+ "sourceIndicator": "UNKNOWN",
+ "attributeListValueChanges": [
+ {
+ "attributeNameValuePairSet": {
+ "faults-enabled": "true"
+ }
+ }
+ ]
+ },
+ "stndDefinedFieldsVersion": "1.0"
+ }
+ }
+} \ No newline at end of file
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_moi_creation.json b/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_moi_creation.json
new file mode 100644
index 000000000..a43edbb58
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_moi_creation.json
@@ -0,0 +1,42 @@
+{
+ "event": {
+ "commonEventHeader": {
+ "version": "4.1",
+ "vesEventListenerVersion": "7.2",
+ "domain": "stndDefined",
+ "stndDefinedNamespace": "3GPP-Provisioning",
+ "eventId": "cm0004012",
+ "eventName": "ves_stdnDefined_3GPP-Provisioning",
+ "nfNamingCode": "NFNC",
+ "nfVendorName": "POC",
+ "nfcNamingCode": "NFC",
+ "priority": "Medium",
+ "reportingEntityId": "device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f",
+ "reportingEntityName": "samsung-O-DU-1122",
+ "sequence": 0,
+ "sourceId": "src_device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f",
+ "sourceName": "samsung-O-DU-1122",
+ "startEpochMicrosec": 1547037007722752,
+ "lastEpochMicrosec": 1547037028498530,
+ "timeZoneOffset": "UTC-05:30"
+ },
+ "stndDefinedFields": {
+ "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/Rel16/OpenAPI/provMnS.yaml#/components/schemas/NotifyMoiCreation",
+ "data": {
+ "href": "href1",
+ "notificationId": 1,
+ "notificationType": "notifyMOICreation",
+ "eventTime": "2021-08-23T11:52:10.6Z",
+ "systemDN": "xyz",
+ "correlatedNotifications": [],
+ "additionalText": "AdditionalTextDetails",
+ "sourceIndicator": "MANAGEMENT_OPERATION",
+ "attributeList": {
+ "pnf-registration": "true",
+ "faults-enabled": "true"
+ }
+ },
+ "stndDefinedFieldsVersion": "1.0"
+ }
+ }
+} \ No newline at end of file
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_moi_deletion.json b/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_moi_deletion.json
new file mode 100644
index 000000000..28c2fefc2
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_moi_deletion.json
@@ -0,0 +1,42 @@
+{
+ "event": {
+ "commonEventHeader": {
+ "version": "4.1",
+ "vesEventListenerVersion": "7.2",
+ "domain": "stndDefined",
+ "stndDefinedNamespace": "3GPP-Provisioning",
+ "eventId": "cm0004012",
+ "eventName": "ves_stdnDefined_3GPP-Provisioning",
+ "nfNamingCode": "NFNC",
+ "nfVendorName": "POC",
+ "nfcNamingCode": "NFC",
+ "priority": "Medium",
+ "reportingEntityId": "device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f",
+ "reportingEntityName": "samsung-O-DU-1122",
+ "sequence": 0,
+ "sourceId": "src_device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f",
+ "sourceName": "samsung-O-DU-1122",
+ "startEpochMicrosec": 1547037007722752,
+ "lastEpochMicrosec": 1547037028498530,
+ "timeZoneOffset": "UTC-05:30"
+ },
+ "stndDefinedFields": {
+ "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/Rel16/OpenAPI/provMnS.yaml#/components/schemas/NotifyMoiDeletion",
+ "data": {
+ "href": "href1",
+ "notificationId": 1,
+ "notificationType": "notifyMOIDeletion",
+ "eventTime": "2021-08-23T11:52:10.6Z",
+ "systemDN": "xyz",
+ "correlatedNotifications": [],
+ "additionalText": "AdditionalTextDetails",
+ "sourceIndicator": "MANAGEMENT_OPERATION",
+ "attributeList": {
+ "pnf-registration": "true",
+ "faults-enabled": "true"
+ }
+ },
+ "stndDefinedFieldsVersion": "1.0"
+ }
+ }
+} \ No newline at end of file
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_valid_two_element_moi_changes_array.json b/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_valid_two_element_moi_changes_array.json
new file mode 100644
index 000000000..44008479d
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/resources/msgs/cm_valid_two_element_moi_changes_array.json
@@ -0,0 +1,61 @@
+{
+ "event": {
+ "commonEventHeader": {
+ "version": "4.1",
+ "vesEventListenerVersion": "7.2",
+ "domain": "stndDefined",
+ "stndDefinedNamespace": "3GPP-Provisioning",
+ "eventId": "cm0004012",
+ "eventName": "ves_stdnDefined_3GPP-Provisioning",
+ "nfNamingCode": "NFNC",
+ "nfVendorName": "POC",
+ "nfcNamingCode": "NFC",
+ "priority": "Medium",
+ "reportingEntityId": "device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f",
+ "reportingEntityName": "samsung-O-DU-1122",
+ "sequence": 0,
+ "sourceId": "src_device_id_1732f1ad-53fd-4fd1-8b73-a677987d4e8f",
+ "sourceName": "samsung-O-DU-1122",
+ "startEpochMicrosec": 1547037007722752,
+ "lastEpochMicrosec": 1547037028498530,
+ "timeZoneOffset": "UTC-05:30"
+ },
+ "stndDefinedFields": {
+ "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/Rel16/OpenAPI/provMnS.yaml#/components/schemas/NotifyMoiChanges",
+ "data": {
+ "href": "href1",
+ "notificationId": 1,
+ "notificationType": "notifyMOIChanges",
+ "eventTime": "2021-08-23T11:52:10.6Z",
+ "systemDN": "xyz",
+ "moiChanges": [
+ {
+ "notificationId": 123,
+ "correlatedNotifications": [],
+ "additionalText": "AdditionalTextDetails",
+ "sourceIndicator": "MANAGEMENT_OPERATION",
+ "path": "https://samsung.com/3GPP/simulation/network-function/ves=1",
+ "operation": "REPLACE",
+ "value": {
+ "pnf-registration": "true",
+ "faults-enabled": "true"
+ }
+ },
+ {
+ "notificationId": 124,
+ "correlatedNotifications": [],
+ "additionalText": "AdditionalTextDetails",
+ "sourceIndicator": "RESOURCE_OPERATION",
+ "path": "https://samsung.com/3GPP/simulation/network-function/ves=2",
+ "operation": "CREATE",
+ "value": {
+ "pnf-registration": "false",
+ "faults-enabled": "false"
+ }
+ }
+ ]
+ },
+ "stndDefinedFieldsVersion": "1.0"
+ }
+ }
+} \ No newline at end of file