summaryrefslogtreecommitdiffstats
path: root/sdnr/wt/mountpoint-registrar/provider/src/main/java
diff options
context:
space:
mode:
authorJakub Dominik <j.dominik@samsung.com>2021-10-14 15:12:55 +0200
committerDan Timoney <dtimoney@att.com>2021-11-18 13:19:44 +0000
commit98aeaac496ac868a97ce9096c1c51ce9a133992a (patch)
treee3e4a52f786f4dbe271485366f0a0c8d74edc85f /sdnr/wt/mountpoint-registrar/provider/src/main/java
parent5d3bfaac4bc7a8b7b030757e1f35795667915a2b (diff)
Extend SDNC persistent service to store CM
Extend SDNC persistent service to store received CM events into Elasticsearch and MariaDB Issue-ID: CCSDK-3497 Signed-off-by: Jakub Dominik <j.dominik@samsung.com> Change-Id: I39983e59ef6512ad6c3864d47aebe1d615897146 Signed-off-by: Michael DÜrre <michael.duerre@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java')
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java100
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java113
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java81
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java95
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java3
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java377
6 files changed, 578 insertions, 191 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java
new file mode 100644
index 000000000..98f02ec7a
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+
+public class CMBasicHeaderFieldsNotification {
+ private String cmNodeId;
+ private String cmSequence;
+ private String cmOccurrenceTime;
+ private String sourceId;
+ private String notificationType;
+
+ public static CMBasicHeaderFieldsNotificationBuilder builder() {
+ return new CMBasicHeaderFieldsNotificationBuilder();
+ }
+
+ private CMBasicHeaderFieldsNotification(
+ CMBasicHeaderFieldsNotification.CMBasicHeaderFieldsNotificationBuilder builder) {
+ this.cmNodeId = builder.cmNodeId;
+ this.cmSequence = builder.cmSequence;
+ this.cmOccurrenceTime = builder.cmOccurrenceTime;
+ this.sourceId = builder.sourceId;
+ this.notificationType = builder.notificationType;
+ }
+
+ public static class CMBasicHeaderFieldsNotificationBuilder {
+ private String cmNodeId;
+ private String cmSequence;
+ private String cmOccurrenceTime;
+ private String sourceId;
+ private String notificationType;
+
+ public CMBasicHeaderFieldsNotification build() {
+ return new CMBasicHeaderFieldsNotification(this);
+ }
+
+ public CMBasicHeaderFieldsNotificationBuilder withCMNodeId(String cmNodeId) {
+ this.cmNodeId = cmNodeId;
+ return this;
+ }
+
+ public CMBasicHeaderFieldsNotificationBuilder withCMSequence(
+ String cmSequence) {
+ this.cmSequence = cmSequence;
+ return this;
+ }
+
+ public CMBasicHeaderFieldsNotificationBuilder withCMOccurrenceTime(
+ String cmOccurrenceTime) {
+ this.cmOccurrenceTime = cmOccurrenceTime;
+ return this;
+ }
+
+ public CMBasicHeaderFieldsNotificationBuilder withSourceId(String sourceId) {
+ this.sourceId = sourceId;
+ return this;
+ }
+
+ public CMBasicHeaderFieldsNotificationBuilder withNotificationType(
+ String notificationType) {
+ this.notificationType = notificationType;
+ return this;
+ }
+ }
+
+ public String getCmNodeId() {
+ return cmNodeId;
+ }
+
+ public String getCmSequence() {
+ return cmSequence;
+ }
+
+ public String getCmOccurrenceTime() {
+ return cmOccurrenceTime;
+ }
+
+ public String getNotificationType() {
+ return notificationType;
+ }
+
+ public String getSourceId() {
+ return sourceId;
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java
new file mode 100644
index 000000000..014ff648d
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java
@@ -0,0 +1,113 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.CmOperation;
+
+public class CMNotification {
+ private CMBasicHeaderFieldsNotification basicHeaderFields;
+ private String cmNotificationId;
+ private String cmSourceIndicator;
+ private String cmPath;
+ private String cmOperation;
+ private String cmValue;
+
+ public static CMNotificationBuilder builder() {
+ return new CMNotificationBuilder();
+ }
+
+ private CMNotification(CMNotificationBuilder builder) {
+ this.basicHeaderFields = builder.basicHeaderFields;
+ this.cmNotificationId = builder.cmNotificationId;
+ this.cmSourceIndicator = builder.cmSourceIndicator;
+ this.cmPath = builder.cmPath;
+ this.cmOperation = builder.cmOperation;
+ this.cmValue = builder.cmValue;
+ }
+
+ public static class CMNotificationBuilder {
+ private CMBasicHeaderFieldsNotification basicHeaderFields;
+ private String cmNotificationId;
+ private String cmSourceIndicator;
+ private String cmValue;
+ private String cmPath;
+
+ private String cmOperation = CmOperation.NULL.getName();
+
+
+ public CMNotification build() {
+ return new CMNotification(this);
+ }
+
+ public CMNotificationBuilder withCMBasicHeaderFieldsNotification(
+ CMBasicHeaderFieldsNotification basicHeaderFields) {
+ this.basicHeaderFields = basicHeaderFields;
+ return this;
+ }
+
+ public CMNotificationBuilder withCMNotificationId(
+ String cmNotificationId) {
+ this.cmNotificationId = cmNotificationId;
+ return this;
+ }
+
+ public CMNotificationBuilder withCMSourceIndicator(String cmSourceIndicator) {
+ this.cmSourceIndicator = cmSourceIndicator;
+ return this;
+ }
+
+ public CMNotificationBuilder withCMValue(String cmValue) {
+ this.cmValue = cmValue;
+ return this;
+ }
+
+ public CMNotificationBuilder withCMOperation(String cmOperation) {
+ this.cmOperation = cmOperation;
+ return this;
+ }
+
+ public CMNotificationBuilder withCMPath(String cmPath) {
+ this.cmPath = cmPath;
+ return this;
+ }
+ }
+
+ public CMBasicHeaderFieldsNotification getBasicHeaderFields() {
+ return basicHeaderFields;
+ }
+
+ public String getCmSourceIndicator() {
+ return cmSourceIndicator;
+ }
+
+ public String getCmPath() {
+ return cmPath;
+ }
+
+ public String getCmNotificationId() {
+ return cmNotificationId;
+ }
+
+ public String getCmOperation() {
+ return cmOperation;
+ }
+
+ public String getCmValue() {
+ return cmValue;
+ }
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java
new file mode 100644
index 000000000..b13d4ea38
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+
+import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.SendMethod.POST;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CMNotificationClient extends MessageClient {
+
+ private static final String CM_NOTIFICATION_URI = "restconf/operations/devicemanager:push-cm-notification";
+ public static final String NODE_ID = "@node-id@", COUNTER = "@counter@", TIMESTAMP = "@timestamp@",
+ OBJECT_ID = "@object-id@", NOTIFICATION_TYPE = "@notification-type@", SOURCE_INDICATOR = "@source-indicator@",
+ NOTIFICATION_ID = "@notification-id@", PATH = "@path@", OPERATION = "@operation@", VALUE = "@value@";
+ public static final List<String> REQUIRED_FIELDS =
+ List.of(NODE_ID, COUNTER, TIMESTAMP, OBJECT_ID, NOTIFICATION_TYPE, NOTIFICATION_ID, SOURCE_INDICATOR, PATH,
+ OPERATION, VALUE);
+
+ private static final String CM_PAYLOAD = "{\n"
+ + " \"devicemanager:input\": {\n"
+ + " \"devicemanager:node-id\": \"" + NODE_ID + "\",\n"
+ + " \"devicemanager:counter\": \"" + COUNTER + "\",\n"
+ + " \"devicemanager:timestamp\": \"" + TIMESTAMP + "\",\n"
+ + " \"devicemanager:object-id\": \"" + OBJECT_ID + "\",\n"
+ + " \"devicemanager:notification-type\": \"" + NOTIFICATION_TYPE + "\",\n"
+ + " \"devicemanager:notification-id\": \"" + NOTIFICATION_ID + "\",\n"
+ + " \"devicemanager:source-indicator\": \"" + SOURCE_INDICATOR + "\",\n"
+ + " \"devicemanager:path\": \"" + PATH + "\",\n"
+ + " \"devicemanager:operation\": \"" + OPERATION + "\",\n"
+ + " \"devicemanager:value\": \"" + VALUE + "\"\n"
+ + " }\n"
+ + "}";
+
+ public CMNotificationClient(String baseUrl) {
+ super(baseUrl, CM_NOTIFICATION_URI);
+ }
+
+ @Override
+ public String prepareMessageFromPayloadMap(Map<String, String> notificationPayloadMap) {
+ return super.prepareMessageFromPayloadMap(notificationPayloadMap, CM_PAYLOAD, REQUIRED_FIELDS);
+ }
+
+ @Override
+ public boolean sendNotification(String message) {
+ return super.sendNotification(message, POST, MessageType.json);
+ }
+
+
+ public static Map<String, String> createCMNotificationPayloadMap(CMNotification cmNotification) {
+ HashMap<String, String> map = new HashMap<>();
+ map.put(NODE_ID, cmNotification.getBasicHeaderFields().getCmNodeId());
+ map.put(COUNTER, cmNotification.getBasicHeaderFields().getCmSequence());
+ map.put(TIMESTAMP, cmNotification.getBasicHeaderFields().getCmOccurrenceTime());
+ map.put(OBJECT_ID, cmNotification.getBasicHeaderFields().getSourceId());
+ map.put(NOTIFICATION_TYPE, cmNotification.getBasicHeaderFields().getNotificationType());
+ map.put(NOTIFICATION_ID, cmNotification.getCmNotificationId());
+ map.put(SOURCE_INDICATOR, cmNotification.getCmSourceIndicator());
+ map.put(PATH, cmNotification.getCmPath());
+ map.put(OPERATION, cmNotification.getCmOperation());
+ map.put(VALUE, cmNotification.getCmValue());
+ return map;
+ }
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java
index 245807ec3..8412e3730 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java
@@ -19,6 +19,10 @@
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Iterator;
+import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,19 +39,96 @@ public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException {
LOG.debug("Processing CM message {}", msg);
JsonNode rootNode = convertMessageToJsonNode(msg);
- JsonNode dataNode;
- JsonNode notificationNode;
try {
- dataNode = rootNode.get("event").get("stndDefinedFields").get("data").requireNonNull();
- if(dataNode.get("notificationType").textValue().equalsIgnoreCase("notifyMOIChanges")) {
- notificationNode = dataNode.get("moiChanges");
- LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", dataNode.get("notificationId"));
+
+ String cmNodeId = rootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
+ String notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue();
+
+ if (notificationType.equalsIgnoreCase("notifyMOIChanges")) {
+ LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", cmNodeId);
+ processMoiChanges(rootNode);
+ } else if (notificationType.equalsIgnoreCase("notifyMOICreation")) {
+ LOG.info("Read CM message from DMaaP topic that is moiCreation type with id {}", cmNodeId);
+ sendCMNotification(preparePayloadMapFromMoi(rootNode, "/event/stndDefinedFields/data/attributeList"));
+ } else if (notificationType.equalsIgnoreCase("notifyMOIDeletion")) {
+ LOG.info("Read CM message from DMaaP topic that is moiDeletion type with id {}", cmNodeId);
+ sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList"));
+ } else if (notificationType.equalsIgnoreCase("notifyMOIAttributeValueChanges")) {
+ LOG.info("Read CM message from DMaaP topic that is moiAttributeValueChanges type with id {}", cmNodeId);
+ sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges"));
+ } else {
+ LOG.warn("Message is invalid, sending aborted, wrong CM notification type {}", notificationType);
+ throw new InvalidMessageException();
}
+
} catch (NullPointerException e) {
LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing");
throw new InvalidMessageException("Missing field");
}
- // take required data from notificationNode
}
+ private CMBasicHeaderFieldsNotification prepareCMCommonHeaderFields(JsonNode rootNode) {
+ return CMBasicHeaderFieldsNotification.builder()
+ .withCMNodeId(rootNode.at("/event/commonEventHeader/reportingEntityName").textValue())
+ .withCMSequence(rootNode.at("/event/commonEventHeader/sequence").toString())
+ .withCMOccurrenceTime(Instant
+ .ofEpochMilli(
+ rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
+ .atZone(ZoneId.of("Z")).toString())
+ .withSourceId(rootNode.at("/event/commonEventHeader/sourceId").textValue())
+ .withNotificationType(rootNode.at("/event/stndDefinedFields/data/notificationType").textValue())
+ .build();
+ }
+
+ private void processMoiChanges(JsonNode rootNode) {
+ Iterator<JsonNode> nodes = rootNode
+ .at("/event/stndDefinedFields/data/moiChanges")
+ .elements();
+ while (nodes.hasNext()) {
+ sendCMNotification(preparePayloadMapFromMoiChangesArray(rootNode, nodes));
+ }
+ }
+
+ public Map<String, String> preparePayloadMapFromMoiChangesArray(JsonNode rootNode, Iterator<JsonNode> nodes) {
+ JsonNode slaidNode = nodes.next();
+ return CMNotificationClient.createCMNotificationPayloadMap(
+ CMNotification.builder()
+ .withCMBasicHeaderFieldsNotification(
+ prepareCMCommonHeaderFields(rootNode))
+ .withCMNotificationId(slaidNode.get("notificationId").toString())
+ .withCMSourceIndicator(slaidNode.get("sourceIndicator").textValue())
+ .withCMPath(slaidNode.get("path").textValue())
+ .withCMOperation(slaidNode.get("operation").textValue())
+ .withCMValue(slaidNode.get("value").toString()
+ .replace("\"", ""))
+ .build());
+ }
+
+ public Map<String, String> preparePayloadMapFromMoi(JsonNode rootNode, String cmValueKey){
+ return CMNotificationClient.createCMNotificationPayloadMap(
+ CMNotification.builder()
+ .withCMBasicHeaderFieldsNotification(
+ prepareCMCommonHeaderFields(rootNode))
+ .withCMSourceIndicator(rootNode.at("/event/stndDefinedFields/data/sourceIndicator").textValue())
+ .withCMValue(rootNode.at(cmValueKey).toString()
+ .replace("\"", ""))
+ .build());
+ }
+
+ private void sendCMNotification(Map<String, String> payloadMapMessage) {
+ CMNotificationClient cmClient = setRESTConfAuthorization();
+ String message = cmClient.prepareMessageFromPayloadMap(payloadMapMessage);
+ cmClient.sendNotification(message);
+ }
+
+
+ private CMNotificationClient setRESTConfAuthorization() {
+ String sdnrUser = getSDNRUser();
+ String sdnrPasswd = getSDNRPasswd();
+
+ CMNotificationClient cmClient = new CMNotificationClient(getBaseUrl());
+ LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
+ cmClient.setAuthorization(sdnrUser, sdnrPasswd);
+ return cmClient;
+ }
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
index fff243893..34b8d4031 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
@@ -80,6 +80,9 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM
}
pauseThread();
}
+ } catch (InterruptedException e) {
+ LOG.warn("Caught exception reading from DMaaP VES Message Topic", e);
+ Thread.currentThread().interrupt();
} catch (JsonProcessingException jsonProcessingException) {
LOG.warn("Failed to convert message to JsonNode: {}", jsonProcessingException.getMessage());
} catch (InvalidMessageException invalidMessageException) {
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
index c694f1d2f..3626f534a 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
@@ -28,189 +28,198 @@ import org.slf4j.LoggerFactory;
public class DMaaPVESMsgConsumerMain implements Runnable {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
- private static final String _PNFREG_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
- private static final String _FAULT_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
- private static final String _CM_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer";
- private static final String _PNFREG_DOMAIN = "pnfRegistration";
- private static final String _FAULT_DOMAIN = "fault";
- private static final String _CM_DOMAIN = "provisioning";
-
- boolean threadsRunning = false;
- List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
- private PNFRegistrationConfig pnfRegistrationConfig;
- private FaultConfig faultConfig;
- private GeneralConfig generalConfig;
- private ProvisioningConfig provisioningConfig;
-
- public DMaaPVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
- this.generalConfig = generalConfig;
- configMap.forEach(this::initialize);
- }
-
- public void initialize(String domain, MessageConfig domainConfig) {
- LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
- String consumerClass;
- Properties consumerProperties = new Properties();
- if (domain.equalsIgnoreCase(_PNFREG_DOMAIN)) {
- this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
- consumerClass = _PNFREG_CLASS;
- LOG.debug("Consumer class = {}", consumerClass);
-
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
- pnfRegistrationConfig.getTransportType());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
- pnfRegistrationConfig.getHostPort());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
- pnfRegistrationConfig.getContenttype());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
- pnfRegistrationConfig.getConsumerGroup());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
- pnfRegistrationConfig.getConsumerId());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
- pnfRegistrationConfig.getTimeout());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
- pnfRegistrationConfig.getFetchPause());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL,
- pnfRegistrationConfig.getProtocol());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME,
- pnfRegistrationConfig.getUsername());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD,
- pnfRegistrationConfig.getPassword());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- pnfRegistrationConfig.getClientReadTimeout());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- pnfRegistrationConfig.getClientConnectTimeout());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- pnfRegistrationConfig.getHTTPProxyURI());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- pnfRegistrationConfig.getHTTPProxyUsername());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- pnfRegistrationConfig.getHTTPProxyPassword());
-
- threadsRunning = createConsumer(_PNFREG_DOMAIN, consumerProperties);
- } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) {
- this.faultConfig = (FaultConfig) domainConfig;
- consumerClass = _FAULT_CLASS;
- LOG.debug("Consumer class = {}", consumerClass);
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- faultConfig.getClientReadTimeout());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- faultConfig.getClientConnectTimeout());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- faultConfig.getHTTPProxyURI());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- faultConfig.getHTTPProxyUsername());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- faultConfig.getHTTPProxyPassword());
- threadsRunning = createConsumer(_FAULT_DOMAIN, consumerProperties);
- } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) {
- this.provisioningConfig = (ProvisioningConfig) domainConfig;
- consumerClass = _CM_CLASS;
- LOG.debug("Consumer class = {}", consumerClass);
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, provisioningConfig.getTransportType());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, provisioningConfig.getHostPort());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, provisioningConfig.getContenttype());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP, provisioningConfig.getConsumerGroup());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, provisioningConfig.getFetchPause());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, provisioningConfig.getProtocol());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_USERNAME, provisioningConfig.getUsername());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PASSWORD, provisioningConfig.getPassword());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- provisioningConfig.getClientReadTimeout());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- provisioningConfig.getClientConnectTimeout());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- provisioningConfig.getHTTPProxyURI());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- provisioningConfig.getHTTPProxyUsername());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- provisioningConfig.getHTTPProxyPassword());
- threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties);
- }
- }
-
- private boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
- boolean threadsRunning = false;
- for (DMaaPVESMsgConsumer consumer : consumers) {
- if (consumer.isRunning()) {
- threadsRunning = true;
- }
- }
- return threadsRunning;
- }
-
- public boolean createConsumer(String consumerType, Properties properties) {
- DMaaPVESMsgConsumerImpl consumer = null;
-
- if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
- consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig);
- else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
- consumer = new DMaaPFaultVESMsgConsumer(generalConfig);
- else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
- consumer = new DMaaPCMVESMsgConsumer(generalConfig);
-
- handleConsumer(consumer, properties, consumers);
- return !consumers.isEmpty();
- }
-
- private boolean handleConsumer(DMaaPVESMsgConsumer consumer, Properties properties,
- List<DMaaPVESMsgConsumer> consumers) {
- if (consumer != null) {
- consumer.init(properties);
-
- if (consumer.isReady()) {
- Thread consumerThread = new Thread(consumer);
- consumerThread.start();
- consumers.add(consumer);
-
- LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), properties);
- return true;
- } else {
- LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
- }
- }
- return false;
- }
-
- @Override
- public void run() {
- while (threadsRunning) {
- threadsRunning = updateThreadState(consumers);
- if (!threadsRunning) {
- break;
- }
-
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- LOG.error(e.getLocalizedMessage(), e);
- }
- }
-
- LOG.info("No listener threads running - exiting");
- }
-
- public List<DMaaPVESMsgConsumer> getConsumers() {
- return consumers;
- }
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
+ private static final String _PNFREG_CLASS =
+ "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
+ private static final String _FAULT_CLASS =
+ "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
+ private static final String _CM_CLASS =
+ "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer";
+ private static final String _PNFREG_DOMAIN = "pnfRegistration";
+ private static final String _FAULT_DOMAIN = "fault";
+ private static final String _CM_DOMAIN = "provisioning";
+
+ boolean threadsRunning = false;
+ List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
+ private PNFRegistrationConfig pnfRegistrationConfig;
+ private FaultConfig faultConfig;
+ private GeneralConfig generalConfig;
+ private ProvisioningConfig provisioningConfig;
+
+ public DMaaPVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
+ this.generalConfig = generalConfig;
+ configMap.forEach(this::initialize);
+ }
+
+ public void initialize(String domain, MessageConfig domainConfig) {
+ LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
+ String consumerClass;
+ Properties consumerProperties = new Properties();
+ if (domain.equalsIgnoreCase(_PNFREG_DOMAIN)) {
+ this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
+ consumerClass = _PNFREG_CLASS;
+ LOG.debug("Consumer class = {}", consumerClass);
+
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
+ pnfRegistrationConfig.getTransportType());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
+ pnfRegistrationConfig.getHostPort());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
+ pnfRegistrationConfig.getContenttype());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
+ pnfRegistrationConfig.getConsumerGroup());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
+ pnfRegistrationConfig.getConsumerId());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
+ pnfRegistrationConfig.getTimeout());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
+ pnfRegistrationConfig.getFetchPause());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL,
+ pnfRegistrationConfig.getProtocol());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME,
+ pnfRegistrationConfig.getUsername());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD,
+ pnfRegistrationConfig.getPassword());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
+ pnfRegistrationConfig.getClientReadTimeout());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
+ pnfRegistrationConfig.getClientConnectTimeout());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
+ pnfRegistrationConfig.getHTTPProxyURI());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
+ pnfRegistrationConfig.getHTTPProxyUsername());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
+ pnfRegistrationConfig.getHTTPProxyPassword());
+
+ threadsRunning = createConsumer(_PNFREG_DOMAIN, consumerProperties);
+ } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) {
+ this.faultConfig = (FaultConfig) domainConfig;
+ consumerClass = _FAULT_CLASS;
+ LOG.debug("Consumer class = {}", consumerClass);
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
+ faultConfig.getClientReadTimeout());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
+ faultConfig.getClientConnectTimeout());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
+ faultConfig.getHTTPProxyURI());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
+ faultConfig.getHTTPProxyUsername());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
+ faultConfig.getHTTPProxyPassword());
+ threadsRunning = createConsumer(_FAULT_DOMAIN, consumerProperties);
+ } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) {
+ this.provisioningConfig = (ProvisioningConfig) domainConfig;
+ consumerClass = _CM_CLASS;
+ LOG.debug("Consumer class = {}", consumerClass);
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
+ provisioningConfig.getTransportType());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
+ provisioningConfig.getHostPort());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
+ provisioningConfig.getContenttype());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP,
+ provisioningConfig.getConsumerGroup());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
+ provisioningConfig.getFetchPause());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, provisioningConfig.getProtocol());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_USERNAME, provisioningConfig.getUsername());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PASSWORD, provisioningConfig.getPassword());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
+ provisioningConfig.getClientReadTimeout());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
+ provisioningConfig.getClientConnectTimeout());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
+ provisioningConfig.getHTTPProxyURI());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
+ provisioningConfig.getHTTPProxyUsername());
+ consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
+ provisioningConfig.getHTTPProxyPassword());
+ threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties);
+ }
+ }
+
+ private boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
+ boolean threadsRunning = false;
+ for (DMaaPVESMsgConsumer consumer : consumers) {
+ if (consumer.isRunning()) {
+ threadsRunning = true;
+ }
+ }
+ return threadsRunning;
+ }
+
+ public boolean createConsumer(String consumerType, Properties properties) {
+ DMaaPVESMsgConsumerImpl consumer = null;
+
+ if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
+ consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig);
+ else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
+ consumer = new DMaaPFaultVESMsgConsumer(generalConfig);
+ else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
+ consumer = new DMaaPCMVESMsgConsumer(generalConfig);
+
+ handleConsumer(consumer, properties, consumers);
+ return !consumers.isEmpty();
+ }
+
+ private boolean handleConsumer(DMaaPVESMsgConsumer consumer, Properties properties,
+ List<DMaaPVESMsgConsumer> consumers) {
+ if (consumer != null) {
+ consumer.init(properties);
+
+ if (consumer.isReady()) {
+ Thread consumerThread = new Thread(consumer);
+ consumerThread.start();
+ consumers.add(consumer);
+
+ LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), properties);
+ return true;
+ } else {
+ LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void run() {
+ while (threadsRunning) {
+ threadsRunning = updateThreadState(consumers);
+ if (!threadsRunning) {
+ break;
+ }
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ LOG.error(e.getLocalizedMessage(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ LOG.info("No listener threads running - exiting");
+ }
+
+ public List<DMaaPVESMsgConsumer> getConsumers() {
+ return consumers;
+ }
}