summaryrefslogtreecommitdiffstats
path: root/sdnr/wt/mountpoint-registrar/provider/src/main
diff options
context:
space:
mode:
authorRafal Wrzesniak <r.wrzesniak@partner.samsung.com>2021-09-30 12:08:03 +0200
committerRafal Wrzesniak <r.wrzesniak@partner.samsung.com>2021-10-15 14:38:06 +0200
commitb9ed86210c28d251c7503b9b0a5b543d957614e8 (patch)
treebeadba763f10c661e51976ce70249260ce29a39e /sdnr/wt/mountpoint-registrar/provider/src/main
parent8f862e52cc36716503c5aea4973031eb4e10d930 (diff)
Adds handling CM provisioning messages
Adds config, consumer and changes to run it in MountpointRegistrar Issue-ID: CCSDK-3464 Signed-off-by: Rafal Wrzesniak <r.wrzesniak@partner.samsung.com> Change-Id: I800db7082dc4a84d73ac9e5dffd90c2f3c46ca82
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main')
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java53
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java6
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java32
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java2
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java41
5 files changed, 134 insertions, 0 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java
new file mode 100644
index 000000000..245807ec3
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java
@@ -0,0 +1,53 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPCMVESMsgConsumer.class);
+
+ public DMaaPCMVESMsgConsumer(GeneralConfig generalConfig) {
+ super(generalConfig);
+ LOG.info("DMaaPCMVESMsgConsumer started successfully");
+ }
+
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException {
+ LOG.debug("Processing CM message {}", msg);
+ JsonNode rootNode = convertMessageToJsonNode(msg);
+ JsonNode dataNode;
+ JsonNode notificationNode;
+ try {
+ dataNode = rootNode.get("event").get("stndDefinedFields").get("data").requireNonNull();
+ if(dataNode.get("notificationType").textValue().equalsIgnoreCase("notifyMOIChanges")) {
+ notificationNode = dataNode.get("moiChanges");
+ LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", dataNode.get("notificationId"));
+ }
+ } catch (NullPointerException e) {
+ LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing");
+ throw new InvalidMessageException("Missing field");
+ }
+ // take required data from notificationNode
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
index f6e70c6b5..fff243893 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
@@ -21,6 +21,8 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
import java.util.Properties;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.MRConsumer;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
@@ -95,6 +97,10 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM
return true;
}
+ protected JsonNode convertMessageToJsonNode(String message) throws JsonProcessingException {
+ return new ObjectMapper().readTree(message);
+ }
+
/*
* Create a consumer by specifying properties containing information such as topic name, timeout, URL etc
*/
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
index 7ce618582..c694f1d2f 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
@@ -31,14 +31,17 @@ public class DMaaPVESMsgConsumerMain implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
private static final String _PNFREG_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
private static final String _FAULT_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
+ private static final String _CM_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer";
private static final String _PNFREG_DOMAIN = "pnfRegistration";
private static final String _FAULT_DOMAIN = "fault";
+ private static final String _CM_DOMAIN = "provisioning";
boolean threadsRunning = false;
List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
private PNFRegistrationConfig pnfRegistrationConfig;
private FaultConfig faultConfig;
private GeneralConfig generalConfig;
+ private ProvisioningConfig provisioningConfig;
public DMaaPVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
this.generalConfig = generalConfig;
@@ -115,6 +118,33 @@ public class DMaaPVESMsgConsumerMain implements Runnable {
consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
faultConfig.getHTTPProxyPassword());
threadsRunning = createConsumer(_FAULT_DOMAIN, consumerProperties);
+ } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) {
+ this.provisioningConfig = (ProvisioningConfig) domainConfig;
+ consumerClass = _CM_CLASS;
+ LOG.debug("Consumer class = {}", consumerClass);
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, provisioningConfig.getTransportType());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, provisioningConfig.getHostPort());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, provisioningConfig.getContenttype());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP, provisioningConfig.getConsumerGroup());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, provisioningConfig.getFetchPause());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, provisioningConfig.getProtocol());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_USERNAME, provisioningConfig.getUsername());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PASSWORD, provisioningConfig.getPassword());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
+ provisioningConfig.getClientReadTimeout());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
+ provisioningConfig.getClientConnectTimeout());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
+ provisioningConfig.getHTTPProxyURI());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
+ provisioningConfig.getHTTPProxyUsername());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
+ provisioningConfig.getHTTPProxyPassword());
+ threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties);
}
}
@@ -135,6 +165,8 @@ public class DMaaPVESMsgConsumerMain implements Runnable {
consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig);
else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
consumer = new DMaaPFaultVESMsgConsumer(generalConfig);
+ else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
+ consumer = new DMaaPCMVESMsgConsumer(generalConfig);
handleConsumer(consumer, properties, consumers);
return !consumers.isEmpty();
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
index 0159ed4f8..136d2a12b 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
@@ -55,9 +55,11 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
generalConfig = new GeneralConfig(configFileRepresentation);
PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation);
FaultConfig faultConfig = new FaultConfig(configFileRepresentation);
+ ProvisioningConfig provisioningConfig = new ProvisioningConfig(configFileRepresentation);
configMap.put("pnfRegistration", pnfRegConfig);
configMap.put("fault", faultConfig);
+ configMap.put("provisioning", provisioningConfig);
dmaapEnabled = generalConfig.getEnabled();
if (dmaapEnabled) { // start dmaap consumer thread only if dmaapEnabled=true
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java
new file mode 100644
index 000000000..91a1f3fbe
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
+
+public class ProvisioningConfig extends MessageConfig {
+
+ private static final String SECTION_MARKER = "provisioning";
+ private static final String DEFAULT_VALUE_CONSUMER_USERNAME = "${DMAAP_CM_TOPIC_USERNAME}";
+ private static final String DEFAULT_VALUE_CONSUMER_PASSWORD = "${DMAAP_CM_TOPIC_PASSWORD}";
+ private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.SEC_3GPP_PROVISIONING_OUTPUT";
+
+ public ProvisioningConfig(ConfigurationFileRepresentation configuration) {
+ super(configuration);
+ sectionMarker = SECTION_MARKER;
+ super.configuration.addSection(SECTION_MARKER);
+ super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME,
+ DEFAULT_VALUE_CONSUMER_USERNAME);
+ super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD,
+ DEFAULT_VALUE_CONSUMER_PASSWORD);
+ super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC,
+ DEFAULT_VALUE_CONSUMER_TOPIC);
+ defaults();
+ }
+}