/* * ============LICENSE_START======================================================================== * ONAP : ccsdk feature sdnr wt mountpoint-registrar * ================================================================================================= * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved. * ================================================================================================= * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under * the License. * ============LICENSE_END========================================================================== */ package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Map; import org.eclipse.jdt.annotation.Nullable; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StrimziKafkaPNFRegVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl { private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaPNFRegVESMsgConsumer.class); private static final String DEFAULT_PROTOCOL = "SSH"; private static final String DEFAULT_PORT = "17830"; private static final String DEFAULT_USERNAME = "netconf"; private static final String DEFAULT_PASSWORD = "netconf"; public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig) { super(generalConfig); } @Override public void processMsg(String msg) { LOG.debug("Message from Kafka topic is - {} ", msg); String pnfId; String pnfIPAddress; @Nullable String pnfCommProtocol; @Nullable String pnfCommPort; @Nullable String pnfKeyId = null; @Nullable String pnfUsername; @Nullable String pnfPasswd = null; String reportingEntityName; ObjectMapper oMapper = new ObjectMapper(); JsonNode sKafkaMessageRootNode; try { sKafkaMessageRootNode = oMapper.readTree(msg); reportingEntityName = sKafkaMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue(); if (reportingEntityName.equals("ONAP SDN-R")) { LOG.info( "VES PNF Registration message generated by SDNR, hence no need to process any further; Ignoring the received message"); return; } pnfId = sKafkaMessageRootNode.at("/event/commonEventHeader/sourceName").textValue(); pnfIPAddress = getPNFIPAddress(sKafkaMessageRootNode); pnfCommProtocol = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/protocol").textValue(); pnfCommPort = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/oamPort").textValue(); if (pnfCommProtocol != null) { if (pnfCommProtocol.equalsIgnoreCase("TLS")) { // Read username and keyId pnfKeyId = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/keyId").textValue(); pnfUsername = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username") .textValue(); } else if (pnfCommProtocol.equalsIgnoreCase("SSH")) { // Read username and password pnfUsername = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username") .textValue(); pnfPasswd = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/password") .textValue(); } else { // log warning - Unknown protocol LOG.warn("Only SSH and TLS protocols supported. Protocol specified in VES message is - {}", pnfCommProtocol, ". Defaulting to SSH"); pnfCommProtocol = DEFAULT_PROTOCOL; pnfCommPort = DEFAULT_PORT; pnfUsername = DEFAULT_USERNAME; pnfPasswd = DEFAULT_PASSWORD; } } else { LOG.warn("Protocol not specified in VES message, Defaulting to SSH"); pnfCommProtocol = DEFAULT_PROTOCOL; pnfCommPort = DEFAULT_PORT; pnfUsername = DEFAULT_USERNAME; pnfPasswd = DEFAULT_PASSWORD; } LOG.debug( "PNF Fields - ID - {} : IP Address - {} : Protocol - {} : TLS Key ID - {} : User - {} : Port - {}", pnfId, pnfIPAddress, pnfCommProtocol, pnfKeyId, pnfUsername, pnfCommPort); String baseUrl = getBaseUrl(); String sdnrUser = getSDNRUser(); String sdnrPasswd = getSDNRPasswd(); if (hasNullInRequiredField(pnfId, pnfIPAddress, pnfCommPort, pnfCommProtocol, pnfUsername)) { LOG.warn("One of the mandatory fields has a null value - pnfId = {} : pnfIPAddress = {} : " + "pnfCommProtocol = {} : pnfUsername {} : pnfCommPort {} - not invoking mountpoint creation", pnfId, pnfIPAddress, pnfCommProtocol, pnfUsername, pnfCommPort); return; } Map payloadMap = PNFMountPointClient.createPNFNotificationPayloadMap(pnfId, pnfIPAddress, pnfCommPort, pnfCommProtocol, pnfUsername, pnfPasswd, pnfKeyId); PNFMountPointClient mountPointClient = new PNFMountPointClient(baseUrl); LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd); mountPointClient.setAuthorization(sdnrUser, sdnrPasswd); String message = mountPointClient.prepareMessageFromPayloadMap(payloadMap); mountPointClient.sendNotification(message); } catch (IOException e) { LOG.info("Cannot parse json object, ignoring the received PNF Registration VES Message. Reason: {}", e.getMessage()); } } private boolean hasNullInRequiredField(String pnfId, String pnfIPAddress, String pnfCommPort, String pnfCommProtocol, String pnfUsername) { return pnfId == null || pnfIPAddress == null || pnfCommProtocol == null || pnfCommPort == null || pnfUsername == null; } private String getPNFIPAddress(JsonNode sKafkaMessageRootNode) { String ipAddress = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/oamV6IpAddress").textValue(); if (ipAddress != null && ipAddress != "") return ipAddress; ipAddress = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/oamV4IpAddress").textValue(); if (ipAddress != null && ipAddress != "") return ipAddress; return null; } }