diff options
author | Jakub Dominik <j.dominik@samsung.com> | 2021-10-14 15:12:55 +0200 |
---|---|---|
committer | Dan Timoney <dtimoney@att.com> | 2021-11-18 13:19:44 +0000 |
commit | 98aeaac496ac868a97ce9096c1c51ce9a133992a (patch) | |
tree | e3e4a52f786f4dbe271485366f0a0c8d74edc85f /sdnr/wt/mountpoint-registrar/provider/src/main/java | |
parent | 5d3bfaac4bc7a8b7b030757e1f35795667915a2b (diff) |
Extend SDNC persistent service to store CM
Extend SDNC persistent service to store received CM events into Elasticsearch and MariaDB
Issue-ID: CCSDK-3497
Signed-off-by: Jakub Dominik <j.dominik@samsung.com>
Change-Id: I39983e59ef6512ad6c3864d47aebe1d615897146
Signed-off-by: Michael DÜrre <michael.duerre@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java')
6 files changed, 578 insertions, 191 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java new file mode 100644 index 000000000..98f02ec7a --- /dev/null +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java @@ -0,0 +1,100 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt mountpoint-registrar + * ================================================================================================= + * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; + +public class CMBasicHeaderFieldsNotification { + private String cmNodeId; + private String cmSequence; + private String cmOccurrenceTime; + private String sourceId; + private String notificationType; + + public static CMBasicHeaderFieldsNotificationBuilder builder() { + return new CMBasicHeaderFieldsNotificationBuilder(); + } + + private CMBasicHeaderFieldsNotification( + CMBasicHeaderFieldsNotification.CMBasicHeaderFieldsNotificationBuilder builder) { + this.cmNodeId = builder.cmNodeId; + this.cmSequence = builder.cmSequence; + this.cmOccurrenceTime = builder.cmOccurrenceTime; + this.sourceId = builder.sourceId; + this.notificationType = builder.notificationType; + } + + public static class CMBasicHeaderFieldsNotificationBuilder { + private String cmNodeId; + private String cmSequence; + private String cmOccurrenceTime; + private String sourceId; + private String notificationType; + + public CMBasicHeaderFieldsNotification build() { + return new CMBasicHeaderFieldsNotification(this); + } + + public CMBasicHeaderFieldsNotificationBuilder withCMNodeId(String cmNodeId) { + this.cmNodeId = cmNodeId; + return this; + } + + public CMBasicHeaderFieldsNotificationBuilder withCMSequence( + String cmSequence) { + this.cmSequence = cmSequence; + return this; + } + + public CMBasicHeaderFieldsNotificationBuilder withCMOccurrenceTime( + String cmOccurrenceTime) { + this.cmOccurrenceTime = cmOccurrenceTime; + return this; + } + + public CMBasicHeaderFieldsNotificationBuilder withSourceId(String sourceId) { + this.sourceId = sourceId; + return this; + } + + public CMBasicHeaderFieldsNotificationBuilder withNotificationType( + String notificationType) { + this.notificationType = notificationType; + return this; + } + } + + public String getCmNodeId() { + return cmNodeId; + } + + public String getCmSequence() { + return cmSequence; + } + + public String getCmOccurrenceTime() { + return cmOccurrenceTime; + } + + public String getNotificationType() { + return notificationType; + } + + public String getSourceId() { + return sourceId; + } + +} diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java new file mode 100644 index 000000000..014ff648d --- /dev/null +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java @@ -0,0 +1,113 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt mountpoint-registrar + * ================================================================================================= + * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; + +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.CmOperation; + +public class CMNotification { + private CMBasicHeaderFieldsNotification basicHeaderFields; + private String cmNotificationId; + private String cmSourceIndicator; + private String cmPath; + private String cmOperation; + private String cmValue; + + public static CMNotificationBuilder builder() { + return new CMNotificationBuilder(); + } + + private CMNotification(CMNotificationBuilder builder) { + this.basicHeaderFields = builder.basicHeaderFields; + this.cmNotificationId = builder.cmNotificationId; + this.cmSourceIndicator = builder.cmSourceIndicator; + this.cmPath = builder.cmPath; + this.cmOperation = builder.cmOperation; + this.cmValue = builder.cmValue; + } + + public static class CMNotificationBuilder { + private CMBasicHeaderFieldsNotification basicHeaderFields; + private String cmNotificationId; + private String cmSourceIndicator; + private String cmValue; + private String cmPath; + + private String cmOperation = CmOperation.NULL.getName(); + + + public CMNotification build() { + return new CMNotification(this); + } + + public CMNotificationBuilder withCMBasicHeaderFieldsNotification( + CMBasicHeaderFieldsNotification basicHeaderFields) { + this.basicHeaderFields = basicHeaderFields; + return this; + } + + public CMNotificationBuilder withCMNotificationId( + String cmNotificationId) { + this.cmNotificationId = cmNotificationId; + return this; + } + + public CMNotificationBuilder withCMSourceIndicator(String cmSourceIndicator) { + this.cmSourceIndicator = cmSourceIndicator; + return this; + } + + public CMNotificationBuilder withCMValue(String cmValue) { + this.cmValue = cmValue; + return this; + } + + public CMNotificationBuilder withCMOperation(String cmOperation) { + this.cmOperation = cmOperation; + return this; + } + + public CMNotificationBuilder withCMPath(String cmPath) { + this.cmPath = cmPath; + return this; + } + } + + public CMBasicHeaderFieldsNotification getBasicHeaderFields() { + return basicHeaderFields; + } + + public String getCmSourceIndicator() { + return cmSourceIndicator; + } + + public String getCmPath() { + return cmPath; + } + + public String getCmNotificationId() { + return cmNotificationId; + } + + public String getCmOperation() { + return cmOperation; + } + + public String getCmValue() { + return cmValue; + } +} diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java new file mode 100644 index 000000000..b13d4ea38 --- /dev/null +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt mountpoint-registrar + * ================================================================================================= + * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ + +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; + +import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.SendMethod.POST; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CMNotificationClient extends MessageClient { + + private static final String CM_NOTIFICATION_URI = "restconf/operations/devicemanager:push-cm-notification"; + public static final String NODE_ID = "@node-id@", COUNTER = "@counter@", TIMESTAMP = "@timestamp@", + OBJECT_ID = "@object-id@", NOTIFICATION_TYPE = "@notification-type@", SOURCE_INDICATOR = "@source-indicator@", + NOTIFICATION_ID = "@notification-id@", PATH = "@path@", OPERATION = "@operation@", VALUE = "@value@"; + public static final List<String> REQUIRED_FIELDS = + List.of(NODE_ID, COUNTER, TIMESTAMP, OBJECT_ID, NOTIFICATION_TYPE, NOTIFICATION_ID, SOURCE_INDICATOR, PATH, + OPERATION, VALUE); + + private static final String CM_PAYLOAD = "{\n" + + " \"devicemanager:input\": {\n" + + " \"devicemanager:node-id\": \"" + NODE_ID + "\",\n" + + " \"devicemanager:counter\": \"" + COUNTER + "\",\n" + + " \"devicemanager:timestamp\": \"" + TIMESTAMP + "\",\n" + + " \"devicemanager:object-id\": \"" + OBJECT_ID + "\",\n" + + " \"devicemanager:notification-type\": \"" + NOTIFICATION_TYPE + "\",\n" + + " \"devicemanager:notification-id\": \"" + NOTIFICATION_ID + "\",\n" + + " \"devicemanager:source-indicator\": \"" + SOURCE_INDICATOR + "\",\n" + + " \"devicemanager:path\": \"" + PATH + "\",\n" + + " \"devicemanager:operation\": \"" + OPERATION + "\",\n" + + " \"devicemanager:value\": \"" + VALUE + "\"\n" + + " }\n" + + "}"; + + public CMNotificationClient(String baseUrl) { + super(baseUrl, CM_NOTIFICATION_URI); + } + + @Override + public String prepareMessageFromPayloadMap(Map<String, String> notificationPayloadMap) { + return super.prepareMessageFromPayloadMap(notificationPayloadMap, CM_PAYLOAD, REQUIRED_FIELDS); + } + + @Override + public boolean sendNotification(String message) { + return super.sendNotification(message, POST, MessageType.json); + } + + + public static Map<String, String> createCMNotificationPayloadMap(CMNotification cmNotification) { + HashMap<String, String> map = new HashMap<>(); + map.put(NODE_ID, cmNotification.getBasicHeaderFields().getCmNodeId()); + map.put(COUNTER, cmNotification.getBasicHeaderFields().getCmSequence()); + map.put(TIMESTAMP, cmNotification.getBasicHeaderFields().getCmOccurrenceTime()); + map.put(OBJECT_ID, cmNotification.getBasicHeaderFields().getSourceId()); + map.put(NOTIFICATION_TYPE, cmNotification.getBasicHeaderFields().getNotificationType()); + map.put(NOTIFICATION_ID, cmNotification.getCmNotificationId()); + map.put(SOURCE_INDICATOR, cmNotification.getCmSourceIndicator()); + map.put(PATH, cmNotification.getCmPath()); + map.put(OPERATION, cmNotification.getCmOperation()); + map.put(VALUE, cmNotification.getCmValue()); + return map; + } +} diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java index 245807ec3..8412e3730 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java @@ -19,6 +19,10 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Iterator; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,19 +39,96 @@ public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl { public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException { LOG.debug("Processing CM message {}", msg); JsonNode rootNode = convertMessageToJsonNode(msg); - JsonNode dataNode; - JsonNode notificationNode; try { - dataNode = rootNode.get("event").get("stndDefinedFields").get("data").requireNonNull(); - if(dataNode.get("notificationType").textValue().equalsIgnoreCase("notifyMOIChanges")) { - notificationNode = dataNode.get("moiChanges"); - LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", dataNode.get("notificationId")); + + String cmNodeId = rootNode.at("/event/commonEventHeader/reportingEntityName").textValue(); + String notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue(); + + if (notificationType.equalsIgnoreCase("notifyMOIChanges")) { + LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", cmNodeId); + processMoiChanges(rootNode); + } else if (notificationType.equalsIgnoreCase("notifyMOICreation")) { + LOG.info("Read CM message from DMaaP topic that is moiCreation type with id {}", cmNodeId); + sendCMNotification(preparePayloadMapFromMoi(rootNode, "/event/stndDefinedFields/data/attributeList")); + } else if (notificationType.equalsIgnoreCase("notifyMOIDeletion")) { + LOG.info("Read CM message from DMaaP topic that is moiDeletion type with id {}", cmNodeId); + sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList")); + } else if (notificationType.equalsIgnoreCase("notifyMOIAttributeValueChanges")) { + LOG.info("Read CM message from DMaaP topic that is moiAttributeValueChanges type with id {}", cmNodeId); + sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges")); + } else { + LOG.warn("Message is invalid, sending aborted, wrong CM notification type {}", notificationType); + throw new InvalidMessageException(); } + } catch (NullPointerException e) { LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing"); throw new InvalidMessageException("Missing field"); } - // take required data from notificationNode } + private CMBasicHeaderFieldsNotification prepareCMCommonHeaderFields(JsonNode rootNode) { + return CMBasicHeaderFieldsNotification.builder() + .withCMNodeId(rootNode.at("/event/commonEventHeader/reportingEntityName").textValue()) + .withCMSequence(rootNode.at("/event/commonEventHeader/sequence").toString()) + .withCMOccurrenceTime(Instant + .ofEpochMilli( + rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000) + .atZone(ZoneId.of("Z")).toString()) + .withSourceId(rootNode.at("/event/commonEventHeader/sourceId").textValue()) + .withNotificationType(rootNode.at("/event/stndDefinedFields/data/notificationType").textValue()) + .build(); + } + + private void processMoiChanges(JsonNode rootNode) { + Iterator<JsonNode> nodes = rootNode + .at("/event/stndDefinedFields/data/moiChanges") + .elements(); + while (nodes.hasNext()) { + sendCMNotification(preparePayloadMapFromMoiChangesArray(rootNode, nodes)); + } + } + + public Map<String, String> preparePayloadMapFromMoiChangesArray(JsonNode rootNode, Iterator<JsonNode> nodes) { + JsonNode slaidNode = nodes.next(); + return CMNotificationClient.createCMNotificationPayloadMap( + CMNotification.builder() + .withCMBasicHeaderFieldsNotification( + prepareCMCommonHeaderFields(rootNode)) + .withCMNotificationId(slaidNode.get("notificationId").toString()) + .withCMSourceIndicator(slaidNode.get("sourceIndicator").textValue()) + .withCMPath(slaidNode.get("path").textValue()) + .withCMOperation(slaidNode.get("operation").textValue()) + .withCMValue(slaidNode.get("value").toString() + .replace("\"", "")) + .build()); + } + + public Map<String, String> preparePayloadMapFromMoi(JsonNode rootNode, String cmValueKey){ + return CMNotificationClient.createCMNotificationPayloadMap( + CMNotification.builder() + .withCMBasicHeaderFieldsNotification( + prepareCMCommonHeaderFields(rootNode)) + .withCMSourceIndicator(rootNode.at("/event/stndDefinedFields/data/sourceIndicator").textValue()) + .withCMValue(rootNode.at(cmValueKey).toString() + .replace("\"", "")) + .build()); + } + + private void sendCMNotification(Map<String, String> payloadMapMessage) { + CMNotificationClient cmClient = setRESTConfAuthorization(); + String message = cmClient.prepareMessageFromPayloadMap(payloadMapMessage); + cmClient.sendNotification(message); + } + + + private CMNotificationClient setRESTConfAuthorization() { + String sdnrUser = getSDNRUser(); + String sdnrPasswd = getSDNRPasswd(); + + CMNotificationClient cmClient = new CMNotificationClient(getBaseUrl()); + LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd); + cmClient.setAuthorization(sdnrUser, sdnrPasswd); + return cmClient; + } } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java index fff243893..34b8d4031 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java @@ -80,6 +80,9 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM } pauseThread(); } + } catch (InterruptedException e) { + LOG.warn("Caught exception reading from DMaaP VES Message Topic", e); + Thread.currentThread().interrupt(); } catch (JsonProcessingException jsonProcessingException) { LOG.warn("Failed to convert message to JsonNode: {}", jsonProcessingException.getMessage()); } catch (InvalidMessageException invalidMessageException) { diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java index c694f1d2f..3626f534a 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java @@ -28,189 +28,198 @@ import org.slf4j.LoggerFactory; public class DMaaPVESMsgConsumerMain implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class); - private static final String _PNFREG_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer"; - private static final String _FAULT_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer"; - private static final String _CM_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer"; - private static final String _PNFREG_DOMAIN = "pnfRegistration"; - private static final String _FAULT_DOMAIN = "fault"; - private static final String _CM_DOMAIN = "provisioning"; - - boolean threadsRunning = false; - List<DMaaPVESMsgConsumer> consumers = new LinkedList<>(); - private PNFRegistrationConfig pnfRegistrationConfig; - private FaultConfig faultConfig; - private GeneralConfig generalConfig; - private ProvisioningConfig provisioningConfig; - - public DMaaPVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) { - this.generalConfig = generalConfig; - configMap.forEach(this::initialize); - } - - public void initialize(String domain, MessageConfig domainConfig) { - LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig); - String consumerClass; - Properties consumerProperties = new Properties(); - if (domain.equalsIgnoreCase(_PNFREG_DOMAIN)) { - this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig; - consumerClass = _PNFREG_CLASS; - LOG.debug("Consumer class = {}", consumerClass); - - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, - pnfRegistrationConfig.getTransportType()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, - pnfRegistrationConfig.getHostPort()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, - pnfRegistrationConfig.getContenttype()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP, - pnfRegistrationConfig.getConsumerGroup()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID, - pnfRegistrationConfig.getConsumerId()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, - pnfRegistrationConfig.getTimeout()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, - pnfRegistrationConfig.getFetchPause()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, - pnfRegistrationConfig.getProtocol()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME, - pnfRegistrationConfig.getUsername()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD, - pnfRegistrationConfig.getPassword()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, - pnfRegistrationConfig.getClientReadTimeout()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, - pnfRegistrationConfig.getClientConnectTimeout()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI, - pnfRegistrationConfig.getHTTPProxyURI()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER, - pnfRegistrationConfig.getHTTPProxyUsername()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD, - pnfRegistrationConfig.getHTTPProxyPassword()); - - threadsRunning = createConsumer(_PNFREG_DOMAIN, consumerProperties); - } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) { - this.faultConfig = (FaultConfig) domainConfig; - consumerClass = _FAULT_CLASS; - LOG.debug("Consumer class = {}", consumerClass); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, - faultConfig.getClientReadTimeout()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, - faultConfig.getClientConnectTimeout()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI, - faultConfig.getHTTPProxyURI()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER, - faultConfig.getHTTPProxyUsername()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD, - faultConfig.getHTTPProxyPassword()); - threadsRunning = createConsumer(_FAULT_DOMAIN, consumerProperties); - } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) { - this.provisioningConfig = (ProvisioningConfig) domainConfig; - consumerClass = _CM_CLASS; - LOG.debug("Consumer class = {}", consumerClass); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, provisioningConfig.getTransportType()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, provisioningConfig.getHostPort()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, provisioningConfig.getContenttype()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP, provisioningConfig.getConsumerGroup()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, provisioningConfig.getFetchPause()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, provisioningConfig.getProtocol()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_USERNAME, provisioningConfig.getUsername()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PASSWORD, provisioningConfig.getPassword()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, - provisioningConfig.getClientReadTimeout()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, - provisioningConfig.getClientConnectTimeout()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI, - provisioningConfig.getHTTPProxyURI()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER, - provisioningConfig.getHTTPProxyUsername()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD, - provisioningConfig.getHTTPProxyPassword()); - threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties); - } - } - - private boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) { - boolean threadsRunning = false; - for (DMaaPVESMsgConsumer consumer : consumers) { - if (consumer.isRunning()) { - threadsRunning = true; - } - } - return threadsRunning; - } - - public boolean createConsumer(String consumerType, Properties properties) { - DMaaPVESMsgConsumerImpl consumer = null; - - if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN)) - consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig); - else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN)) - consumer = new DMaaPFaultVESMsgConsumer(generalConfig); - else if (consumerType.equalsIgnoreCase(_CM_DOMAIN)) - consumer = new DMaaPCMVESMsgConsumer(generalConfig); - - handleConsumer(consumer, properties, consumers); - return !consumers.isEmpty(); - } - - private boolean handleConsumer(DMaaPVESMsgConsumer consumer, Properties properties, - List<DMaaPVESMsgConsumer> consumers) { - if (consumer != null) { - consumer.init(properties); - - if (consumer.isReady()) { - Thread consumerThread = new Thread(consumer); - consumerThread.start(); - consumers.add(consumer); - - LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), properties); - return true; - } else { - LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName()); - } - } - return false; - } - - @Override - public void run() { - while (threadsRunning) { - threadsRunning = updateThreadState(consumers); - if (!threadsRunning) { - break; - } - - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - LOG.error(e.getLocalizedMessage(), e); - } - } - - LOG.info("No listener threads running - exiting"); - } - - public List<DMaaPVESMsgConsumer> getConsumers() { - return consumers; - } + private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class); + private static final String _PNFREG_CLASS = + "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer"; + private static final String _FAULT_CLASS = + "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer"; + private static final String _CM_CLASS = + "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer"; + private static final String _PNFREG_DOMAIN = "pnfRegistration"; + private static final String _FAULT_DOMAIN = "fault"; + private static final String _CM_DOMAIN = "provisioning"; + + boolean threadsRunning = false; + List<DMaaPVESMsgConsumer> consumers = new LinkedList<>(); + private PNFRegistrationConfig pnfRegistrationConfig; + private FaultConfig faultConfig; + private GeneralConfig generalConfig; + private ProvisioningConfig provisioningConfig; + + public DMaaPVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) { + this.generalConfig = generalConfig; + configMap.forEach(this::initialize); + } + + public void initialize(String domain, MessageConfig domainConfig) { + LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig); + String consumerClass; + Properties consumerProperties = new Properties(); + if (domain.equalsIgnoreCase(_PNFREG_DOMAIN)) { + this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig; + consumerClass = _PNFREG_CLASS; + LOG.debug("Consumer class = {}", consumerClass); + + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, + pnfRegistrationConfig.getTransportType()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, + pnfRegistrationConfig.getHostPort()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, + pnfRegistrationConfig.getContenttype()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP, + pnfRegistrationConfig.getConsumerGroup()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID, + pnfRegistrationConfig.getConsumerId()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, + pnfRegistrationConfig.getTimeout()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, + pnfRegistrationConfig.getFetchPause()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, + pnfRegistrationConfig.getProtocol()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME, + pnfRegistrationConfig.getUsername()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD, + pnfRegistrationConfig.getPassword()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, + pnfRegistrationConfig.getClientReadTimeout()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, + pnfRegistrationConfig.getClientConnectTimeout()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI, + pnfRegistrationConfig.getHTTPProxyURI()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER, + pnfRegistrationConfig.getHTTPProxyUsername()); + consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD, + pnfRegistrationConfig.getHTTPProxyPassword()); + + threadsRunning = createConsumer(_PNFREG_DOMAIN, consumerProperties); + } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) { + this.faultConfig = (FaultConfig) domainConfig; + consumerClass = _FAULT_CLASS; + LOG.debug("Consumer class = {}", consumerClass); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, + faultConfig.getClientReadTimeout()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, + faultConfig.getClientConnectTimeout()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI, + faultConfig.getHTTPProxyURI()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER, + faultConfig.getHTTPProxyUsername()); + consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD, + faultConfig.getHTTPProxyPassword()); + threadsRunning = createConsumer(_FAULT_DOMAIN, consumerProperties); + } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) { + this.provisioningConfig = (ProvisioningConfig) domainConfig; + consumerClass = _CM_CLASS; + LOG.debug("Consumer class = {}", consumerClass); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, + provisioningConfig.getTransportType()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, + provisioningConfig.getHostPort()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, + provisioningConfig.getContenttype()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP, + provisioningConfig.getConsumerGroup()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, + provisioningConfig.getFetchPause()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, provisioningConfig.getProtocol()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_USERNAME, provisioningConfig.getUsername()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PASSWORD, provisioningConfig.getPassword()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, + provisioningConfig.getClientReadTimeout()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, + provisioningConfig.getClientConnectTimeout()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI, + provisioningConfig.getHTTPProxyURI()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER, + provisioningConfig.getHTTPProxyUsername()); + consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD, + provisioningConfig.getHTTPProxyPassword()); + threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties); + } + } + + private boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) { + boolean threadsRunning = false; + for (DMaaPVESMsgConsumer consumer : consumers) { + if (consumer.isRunning()) { + threadsRunning = true; + } + } + return threadsRunning; + } + + public boolean createConsumer(String consumerType, Properties properties) { + DMaaPVESMsgConsumerImpl consumer = null; + + if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN)) + consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig); + else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN)) + consumer = new DMaaPFaultVESMsgConsumer(generalConfig); + else if (consumerType.equalsIgnoreCase(_CM_DOMAIN)) + consumer = new DMaaPCMVESMsgConsumer(generalConfig); + + handleConsumer(consumer, properties, consumers); + return !consumers.isEmpty(); + } + + private boolean handleConsumer(DMaaPVESMsgConsumer consumer, Properties properties, + List<DMaaPVESMsgConsumer> consumers) { + if (consumer != null) { + consumer.init(properties); + + if (consumer.isReady()) { + Thread consumerThread = new Thread(consumer); + consumerThread.start(); + consumers.add(consumer); + + LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), properties); + return true; + } else { + LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName()); + } + } + return false; + } + + @Override + public void run() { + while (threadsRunning) { + threadsRunning = updateThreadState(consumers); + if (!threadsRunning) { + break; + } + + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + LOG.error(e.getLocalizedMessage(), e); + Thread.currentThread().interrupt(); + } + } + + LOG.info("No listener threads running - exiting"); + } + + public List<DMaaPVESMsgConsumer> getConsumers() { + return consumers; + } } |