diff options
author | Rafal Wrzesniak <r.wrzesniak@partner.samsung.com> | 2021-09-30 12:08:03 +0200 |
---|---|---|
committer | Rafal Wrzesniak <r.wrzesniak@partner.samsung.com> | 2021-10-15 14:38:06 +0200 |
commit | b9ed86210c28d251c7503b9b0a5b543d957614e8 (patch) | |
tree | beadba763f10c661e51976ce70249260ce29a39e /sdnr/wt/mountpoint-registrar/provider/src/main | |
parent | 8f862e52cc36716503c5aea4973031eb4e10d930 (diff) |
Adds handling CM provisioning messages
Adds config, consumer and changes to run it in MountpointRegistrar
Issue-ID: CCSDK-3464
Signed-off-by: Rafal Wrzesniak <r.wrzesniak@partner.samsung.com>
Change-Id: I800db7082dc4a84d73ac9e5dffd90c2f3c46ca82
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main')
5 files changed, 134 insertions, 0 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java new file mode 100644 index 000000000..245807ec3 --- /dev/null +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java @@ -0,0 +1,53 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt mountpoint-registrar + * ================================================================================================= + * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ + +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl { + + private static final Logger LOG = LoggerFactory.getLogger(DMaaPCMVESMsgConsumer.class); + + public DMaaPCMVESMsgConsumer(GeneralConfig generalConfig) { + super(generalConfig); + LOG.info("DMaaPCMVESMsgConsumer started successfully"); + } + + @Override + public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException { + LOG.debug("Processing CM message {}", msg); + JsonNode rootNode = convertMessageToJsonNode(msg); + JsonNode dataNode; + JsonNode notificationNode; + try { + dataNode = rootNode.get("event").get("stndDefinedFields").get("data").requireNonNull(); + if(dataNode.get("notificationType").textValue().equalsIgnoreCase("notifyMOIChanges")) { + notificationNode = dataNode.get("moiChanges"); + LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", dataNode.get("notificationId")); + } + } catch (NullPointerException e) { + LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing"); + throw new InvalidMessageException("Missing field"); + } + // take required data from notificationNode + } + +} diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java index f6e70c6b5..fff243893 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java @@ -21,6 +21,8 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; import java.util.Properties; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.MRConsumer; import org.onap.dmaap.mr.client.response.MRConsumerResponse; @@ -95,6 +97,10 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM return true; } + protected JsonNode convertMessageToJsonNode(String message) throws JsonProcessingException { + return new ObjectMapper().readTree(message); + } + /* * Create a consumer by specifying properties containing information such as topic name, timeout, URL etc */ diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java index 7ce618582..c694f1d2f 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java @@ -31,14 +31,17 @@ public class DMaaPVESMsgConsumerMain implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class); private static final String _PNFREG_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer"; private static final String _FAULT_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer"; + private static final String _CM_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer"; private static final String _PNFREG_DOMAIN = "pnfRegistration"; private static final String _FAULT_DOMAIN = "fault"; + private static final String _CM_DOMAIN = "provisioning"; boolean threadsRunning = false; List<DMaaPVESMsgConsumer> consumers = new LinkedList<>(); private PNFRegistrationConfig pnfRegistrationConfig; private FaultConfig faultConfig; private GeneralConfig generalConfig; + private ProvisioningConfig provisioningConfig; public DMaaPVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) { this.generalConfig = generalConfig; @@ -115,6 +118,33 @@ public class DMaaPVESMsgConsumerMain implements Runnable { consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD, faultConfig.getHTTPProxyPassword()); threadsRunning = createConsumer(_FAULT_DOMAIN, consumerProperties); + } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) { + this.provisioningConfig = (ProvisioningConfig) domainConfig; + consumerClass = _CM_CLASS; + LOG.debug("Consumer class = {}", consumerClass); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, provisioningConfig.getTransportType()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, provisioningConfig.getHostPort()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, provisioningConfig.getContenttype()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP, provisioningConfig.getConsumerGroup()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, provisioningConfig.getFetchPause()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, provisioningConfig.getProtocol()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_USERNAME, provisioningConfig.getUsername()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PASSWORD, provisioningConfig.getPassword()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, + provisioningConfig.getClientReadTimeout()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, + provisioningConfig.getClientConnectTimeout()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI, + provisioningConfig.getHTTPProxyURI()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER, + provisioningConfig.getHTTPProxyUsername()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD, + provisioningConfig.getHTTPProxyPassword()); + threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties); } } @@ -135,6 +165,8 @@ public class DMaaPVESMsgConsumerMain implements Runnable { consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig); else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN)) consumer = new DMaaPFaultVESMsgConsumer(generalConfig); + else if (consumerType.equalsIgnoreCase(_CM_DOMAIN)) + consumer = new DMaaPCMVESMsgConsumer(generalConfig); handleConsumer(consumer, properties, consumers); return !consumers.isEmpty(); diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java index 0159ed4f8..136d2a12b 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java @@ -55,9 +55,11 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis generalConfig = new GeneralConfig(configFileRepresentation); PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation); FaultConfig faultConfig = new FaultConfig(configFileRepresentation); + ProvisioningConfig provisioningConfig = new ProvisioningConfig(configFileRepresentation); configMap.put("pnfRegistration", pnfRegConfig); configMap.put("fault", faultConfig); + configMap.put("provisioning", provisioningConfig); dmaapEnabled = generalConfig.getEnabled(); if (dmaapEnabled) { // start dmaap consumer thread only if dmaapEnabled=true diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java new file mode 100644 index 000000000..91a1f3fbe --- /dev/null +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt mountpoint-registrar + * ================================================================================================= + * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ + +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; + +public class ProvisioningConfig extends MessageConfig { + + private static final String SECTION_MARKER = "provisioning"; + private static final String DEFAULT_VALUE_CONSUMER_USERNAME = "${DMAAP_CM_TOPIC_USERNAME}"; + private static final String DEFAULT_VALUE_CONSUMER_PASSWORD = "${DMAAP_CM_TOPIC_PASSWORD}"; + private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.SEC_3GPP_PROVISIONING_OUTPUT"; + + public ProvisioningConfig(ConfigurationFileRepresentation configuration) { + super(configuration); + sectionMarker = SECTION_MARKER; + super.configuration.addSection(SECTION_MARKER); + super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME, + DEFAULT_VALUE_CONSUMER_USERNAME); + super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD, + DEFAULT_VALUE_CONSUMER_PASSWORD); + super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC, + DEFAULT_VALUE_CONSUMER_TOPIC); + defaults(); + } +} |