diff options
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java')
-rw-r--r-- | sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java new file mode 100644 index 000000000..147202fb8 --- /dev/null +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java @@ -0,0 +1,156 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt mountpoint-registrar + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ + +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.Map; +import org.eclipse.jdt.annotation.Nullable; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StrimziKafkaPNFRegVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl { + + private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaPNFRegVESMsgConsumer.class); + private static final String DEFAULT_PROTOCOL = "SSH"; + private static final String DEFAULT_PORT = "17830"; + private static final String DEFAULT_USERNAME = "netconf"; + private static final String DEFAULT_PASSWORD = "netconf"; + + + public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig) { + super(generalConfig); + } + + @Override + public void processMsg(String msg) { + LOG.debug("Message from Kafka topic is - {} ", msg); + String pnfId; + String pnfIPAddress; + @Nullable + String pnfCommProtocol; + @Nullable + String pnfCommPort; + @Nullable + String pnfKeyId = null; + @Nullable + String pnfUsername; + @Nullable + String pnfPasswd = null; + String reportingEntityName; + ObjectMapper oMapper = new ObjectMapper(); + JsonNode sKafkaMessageRootNode; + try { + sKafkaMessageRootNode = oMapper.readTree(msg); + reportingEntityName = sKafkaMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue(); + if (reportingEntityName.equals("ONAP SDN-R")) { + LOG.info( + "VES PNF Registration message generated by SDNR, hence no need to process any further; Ignoring the received message"); + return; + } + + pnfId = sKafkaMessageRootNode.at("/event/commonEventHeader/sourceName").textValue(); + pnfIPAddress = getPNFIPAddress(sKafkaMessageRootNode); + pnfCommProtocol = + sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/protocol").textValue(); + pnfCommPort = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/oamPort").textValue(); + if (pnfCommProtocol != null) { + if (pnfCommProtocol.equalsIgnoreCase("TLS")) { + // Read username and keyId + pnfKeyId = + sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/keyId").textValue(); + pnfUsername = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username") + .textValue(); + } else if (pnfCommProtocol.equalsIgnoreCase("SSH")) { + // Read username and password + pnfUsername = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username") + .textValue(); + pnfPasswd = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/password") + .textValue(); + } else { + // log warning - Unknown protocol + LOG.warn("Only SSH and TLS protocols supported. Protocol specified in VES message is - {}", + pnfCommProtocol, ". Defaulting to SSH"); + pnfCommProtocol = DEFAULT_PROTOCOL; + pnfCommPort = DEFAULT_PORT; + pnfUsername = DEFAULT_USERNAME; + pnfPasswd = DEFAULT_PASSWORD; + } + } else { + LOG.warn("Protocol not specified in VES message, Defaulting to SSH"); + pnfCommProtocol = DEFAULT_PROTOCOL; + pnfCommPort = DEFAULT_PORT; + pnfUsername = DEFAULT_USERNAME; + pnfPasswd = DEFAULT_PASSWORD; + } + + LOG.debug( + "PNF Fields - ID - {} : IP Address - {} : Protocol - {} : TLS Key ID - {} : User - {} : Port - {}", + pnfId, pnfIPAddress, pnfCommProtocol, pnfKeyId, pnfUsername, pnfCommPort); + + String baseUrl = getBaseUrl(); + String sdnrUser = getSDNRUser(); + String sdnrPasswd = getSDNRPasswd(); + + if (hasNullInRequiredField(pnfId, pnfIPAddress, pnfCommPort, pnfCommProtocol, pnfUsername)) { + LOG.warn("One of the mandatory fields has a null value - pnfId = {} : pnfIPAddress = {} : " + + "pnfCommProtocol = {} : pnfUsername {} : pnfCommPort {} - not invoking mountpoint creation", + pnfId, pnfIPAddress, pnfCommProtocol, pnfUsername, pnfCommPort); + return; + } + + Map<String, String> payloadMap = PNFMountPointClient.createPNFNotificationPayloadMap(pnfId, pnfIPAddress, + pnfCommPort, pnfCommProtocol, pnfUsername, pnfPasswd, pnfKeyId); + + PNFMountPointClient mountPointClient = new PNFMountPointClient(baseUrl); + LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd); + mountPointClient.setAuthorization(sdnrUser, sdnrPasswd); + String message = mountPointClient.prepareMessageFromPayloadMap(payloadMap); + mountPointClient.sendNotification(message); + + } catch (IOException e) { + LOG.info("Cannot parse json object, ignoring the received PNF Registration VES Message. Reason: {}", + e.getMessage()); + } + } + + private boolean hasNullInRequiredField(String pnfId, String pnfIPAddress, String pnfCommPort, + String pnfCommProtocol, String pnfUsername) { + + return pnfId == null || pnfIPAddress == null || pnfCommProtocol == null || + pnfCommPort == null || pnfUsername == null; + } + + private String getPNFIPAddress(JsonNode sKafkaMessageRootNode) { + String ipAddress = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/oamV6IpAddress").textValue(); + if (ipAddress != null && ipAddress != "") + return ipAddress; + + ipAddress = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/oamV4IpAddress").textValue(); + if (ipAddress != null && ipAddress != "") + return ipAddress; + + return null; + } + +} |