summaryrefslogtreecommitdiffstats
path: root/sdnr/wt/mountpoint-registrar/provider/src/main/java
diff options
context:
space:
mode:
authorRavi Pendurty <ravi.pendurty@highstreet-technologies.com>2023-03-14 18:02:29 +0530
committerKAPIL SINGAL <ks220y@att.com>2023-03-14 14:07:58 +0000
commit3323a01bc3633dd723c1c7e9ad9488f89029bd1f (patch)
tree8c2f8aa84bc11615dbd990b97e9657ab4154b9f4 /sdnr/wt/mountpoint-registrar/provider/src/main/java
parentdc75699b17dfd4af719127e2e6f67d49b06e04e0 (diff)
Use Strimzi Kafka and Kafka native APIs
dmaapClient library is no longer used as DMaaP-MR is being deprecated Issue-ID: CCSDK-3784 Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> Change-Id: I12b9b7c8c57ad983a162e04ad8e76a57978fa9ee Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java')
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/FaultConfig.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultConfig.java)8
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/GeneralConfig.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/GeneralConfig.java)15
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java95
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/PNFRegistrationConfig.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFRegistrationConfig.java)8
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/ProvisioningConfig.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java)8
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StndDefinedFaultConfig.java37
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StrimziKafkaConfig.java98
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java12
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageConfig.java187
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java67
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumer.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumer.java)4
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java)99
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java)158
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgValidator.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgValidator.java)2
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java81
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMBasicHeaderFieldsNotification.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java)2
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotification.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java)2
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotificationClient.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java)5
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java)22
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/FaultNotificationClient.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultNotificationClient.java)4
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPFaultVESMsgConsumer.java)35
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/PNFMountPointClient.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFMountPointClient.java)3
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPPNFRegVESMsgConsumer.java)40
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java141
24 files changed, 680 insertions, 453 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/FaultConfig.java
index 7c71f7ee7..42180bd44 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/FaultConfig.java
@@ -16,15 +16,13 @@
* the License.
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
public class FaultConfig extends MessageConfig {
private static final String SECTION_MARKER = "fault";
- private static final String DEFAULT_VALUE_CONSUMER_USERNAME = "${DMAAP_FAULT_TOPIC_USERNAME}";
- private static final String DEFAULT_VALUE_CONSUMER_PASSWORD = "${DMAAP_FAULT_TOPIC_PASSWORD}";
private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.SEC_FAULT_OUTPUT";
@@ -32,10 +30,6 @@ public class FaultConfig extends MessageConfig {
super(configuration);
sectionMarker = SECTION_MARKER;
super.configuration.addSection(SECTION_MARKER);
- super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME,
- DEFAULT_VALUE_CONSUMER_USERNAME);
- super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD,
- DEFAULT_VALUE_CONSUMER_PASSWORD);
super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC,
DEFAULT_VALUE_CONSUMER_TOPIC);
defaults();
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/GeneralConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/GeneralConfig.java
index eec4e7a9e..a8f920497 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/GeneralConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/GeneralConfig.java
@@ -15,25 +15,18 @@
* the License.
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
/**
* Configuration of mountpoint-registrar, general section<br>
- * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not Generates default
- * Configuration properties if none exist or exist partially Generates Consumer properties only for
- * TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not
- * generated by default. For a list of applicable properties for the different TranportType values, please see -
- * https://wiki.onap.org/display/DW/Feature+configuration+requirements
*/
public class GeneralConfig implements Configuration {
private static final String SECTION_MARKER = "general";
- private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled";
-
private static final String PROPERTY_KEY_USER = "sdnrUser";
private static final String DEFAULT_VALUE_USER = "${SDNRUSERNAME}";
@@ -52,10 +45,6 @@ public class GeneralConfig implements Configuration {
defaults();
}
- public Boolean getEnabled() {
- return configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED);
- }
-
public String getBaseUrl() {
return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_BASEURL);
}
@@ -75,8 +64,6 @@ public class GeneralConfig implements Configuration {
@Override
public void defaults() {
- // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE);
configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_BASEURL, DEFAULT_VALUE_BASEURL);
configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_USER, DEFAULT_VALUE_USER);
configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_USERPASSWD, DEFAULT_VALUE_USERPASSWD);
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java
new file mode 100644
index 000000000..3b3394454
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java
@@ -0,0 +1,95 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
+
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
+
+
+public abstract class MessageConfig implements Configuration {
+ protected String sectionMarker;
+
+ public static final String PROPERTY_KEY_CONSUMER_TOPIC = "topic";
+
+ public static final String PROPERTY_KEY_CONSUMER_GROUP = "consumerGroup";
+ private static final String DEFAULT_VALUE_CONSUMER_GROUP = "myG";
+
+ public static final String PROPERTY_KEY_CONSUMER_ID = "consumerID";
+ private static final String DEFAULT_VALUE_CONSUMER_ID = "C1";
+
+ public static final String PROPERTY_KEY_CONSUMER_TIMEOUT = "timeout";
+ private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "20000";
+
+ public static final String PROPERTY_KEY_CONSUMER_LIMIT = "limit";
+ private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "10000";
+
+ public static final String PROPERTY_KEY_CONSUMER_FETCHPAUSE = "fetchPause";
+ private static final String DEFAULT_VALUE_CONSUMER_FETCHPAUSE = "5000";
+
+ protected ConfigurationFileRepresentation configuration;
+
+ public MessageConfig(ConfigurationFileRepresentation configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public String getSectionName() {
+ return sectionMarker;
+ }
+
+ @Override
+ public void defaults() {
+ configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_GROUP,
+ DEFAULT_VALUE_CONSUMER_GROUP);
+ configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_ID, DEFAULT_VALUE_CONSUMER_ID);
+ configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_TIMEOUT,
+ DEFAULT_VALUE_CONSUMER_TIMEOUT);
+ configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_LIMIT,
+ DEFAULT_VALUE_CONSUMER_LIMIT);
+ configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_FETCHPAUSE,
+ DEFAULT_VALUE_CONSUMER_FETCHPAUSE);
+ }
+
+ public String getTopic() {
+ return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TOPIC);
+ }
+
+ public String getConsumerGroup() {
+ return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_GROUP);
+ }
+
+ public String getConsumerId() {
+ return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_ID);
+ }
+
+ public String getTimeout() {
+ return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TIMEOUT);
+ }
+
+ public String getLimit() {
+ return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_LIMIT);
+ }
+
+ public String getFetchPause() {
+ return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_FETCHPAUSE);
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFRegistrationConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/PNFRegistrationConfig.java
index acc0b3a89..a2292f576 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFRegistrationConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/PNFRegistrationConfig.java
@@ -16,15 +16,13 @@
* the License.
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
public class PNFRegistrationConfig extends MessageConfig {
private static final String SECTION_MARKER = "pnfRegistration";
- private static final String DEFAULT_VALUE_CONSUMER_USERNAME = "${DMAAP_PNFREG_TOPIC_USERNAME}";
- private static final String DEFAULT_VALUE_CONSUMER_PASSWORD = "${DMAAP_PNFREG_TOPIC_PASSWORD}";
private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.VES_PNFREG_OUTPUT";
@@ -32,10 +30,6 @@ public class PNFRegistrationConfig extends MessageConfig {
super(configuration);
sectionMarker = SECTION_MARKER;
super.configuration.addSection(SECTION_MARKER);
- super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME,
- DEFAULT_VALUE_CONSUMER_USERNAME);
- super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD,
- DEFAULT_VALUE_CONSUMER_PASSWORD);
super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC,
DEFAULT_VALUE_CONSUMER_TOPIC);
defaults();
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/ProvisioningConfig.java
index 91a1f3fbe..a2119dbc5 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/ProvisioningConfig.java
@@ -16,24 +16,18 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
public class ProvisioningConfig extends MessageConfig {
private static final String SECTION_MARKER = "provisioning";
- private static final String DEFAULT_VALUE_CONSUMER_USERNAME = "${DMAAP_CM_TOPIC_USERNAME}";
- private static final String DEFAULT_VALUE_CONSUMER_PASSWORD = "${DMAAP_CM_TOPIC_PASSWORD}";
private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.SEC_3GPP_PROVISIONING_OUTPUT";
public ProvisioningConfig(ConfigurationFileRepresentation configuration) {
super(configuration);
sectionMarker = SECTION_MARKER;
super.configuration.addSection(SECTION_MARKER);
- super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME,
- DEFAULT_VALUE_CONSUMER_USERNAME);
- super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD,
- DEFAULT_VALUE_CONSUMER_PASSWORD);
super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC,
DEFAULT_VALUE_CONSUMER_TOPIC);
defaults();
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StndDefinedFaultConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StndDefinedFaultConfig.java
new file mode 100644
index 000000000..0a1381c2a
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StndDefinedFaultConfig.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2022 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
+
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
+
+public class StndDefinedFaultConfig extends MessageConfig {
+
+ private static final String SECTION_MARKER = "stndDefinedFault";
+ private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.SEC_3GPP_FAULTSUPERVISION_OUTPUT";
+
+ public StndDefinedFaultConfig(ConfigurationFileRepresentation configuration) {
+ super(configuration);
+ sectionMarker = SECTION_MARKER;
+ super.configuration.addSection(SECTION_MARKER);
+ super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC,
+ DEFAULT_VALUE_CONSUMER_TOPIC);
+ defaults();
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StrimziKafkaConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StrimziKafkaConfig.java
new file mode 100644
index 000000000..41ab8a7cb
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StrimziKafkaConfig.java
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2022 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
+
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
+
+/*
+ * [strimzi-kafka]
+ * bootstrapServers=abc:9092,def:9092
+ * securityProtocol=PLAINTEXT #OTHER POSSIBLE VALUES - SSL, SASL_PLAINTEXT, SASL_SSL
+ * saslMechanism=PLAIN #Need to understand more
+ * saslJaasConfig=
+ * consumerGroup=
+ * consumerID=
+ */
+public class StrimziKafkaConfig implements Configuration {
+
+ private static final String SECTION_MARKER = "strimzi-kafka";
+
+ private static final String PROPERTY_KEY_ENABLED = "strimziEnabled";
+
+ private static final String PROPERTY_KEY_BOOTSTRAPSERVERS = "bootstrapServers";
+ private static final String DEFAULT_VALUE_BOOTSTRAPSERVERS = "onap-strimzi-kafka-0:9094,onap-strimzi-kafka-1:9094";
+
+ private static final String PROPERTY_KEY_SECURITYPROTOCOL = "securityProtocol";
+ private static final String DEFAULT_VALUE_SECURITYPROTOCOL = "PLAINTEXT";
+
+ private static final String PROPERTY_KEY_SASLMECHANISM = "saslMechanism";
+ private static final String DEFAULT_VALUE_SASLMECHANISM = "PLAIN";
+
+ private static final String PROPERTY_KEY_SASLJAASCONFIG = "saslJaasConfig";
+ private static final String DEFAULT_VALUE_SASLJAASCONFIG = "PLAIN"; // TBD
+
+ private ConfigurationFileRepresentation configuration;
+
+ public StrimziKafkaConfig(ConfigurationFileRepresentation configuration) {
+ this.configuration = configuration;
+ configuration.addSection(SECTION_MARKER);
+ defaults();
+ }
+
+ public Boolean getEnabled() {
+ return configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED);
+ }
+
+ public String getBootstrapServers() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_BOOTSTRAPSERVERS);
+ }
+
+ public String getSecurityProtocol() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_SECURITYPROTOCOL);
+ }
+
+ public String getSaslMechanism() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_SASLMECHANISM);
+ }
+
+ public String getSaslJaasConfig() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_SASLJAASCONFIG);
+ }
+
+ @Override
+ public String getSectionName() {
+ return SECTION_MARKER;
+ }
+
+ @Override
+ public void defaults() {
+ // The default value should be "false" given that SDNR can be run in
+ // environments where Strimzi is not used
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_BOOTSTRAPSERVERS,
+ DEFAULT_VALUE_BOOTSTRAPSERVERS);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_SECURITYPROTOCOL,
+ DEFAULT_VALUE_SECURITYPROTOCOL);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_SASLMECHANISM,
+ DEFAULT_VALUE_SASLMECHANISM);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_SASLJAASCONFIG,
+ DEFAULT_VALUE_SASLJAASCONFIG);
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java
index 584982a5b..d9d487257 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java
@@ -1,5 +1,10 @@
/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
* Copyright (C) 2021 Samsung Electronics
+ * Copyright (C) 2022 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -9,6 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
+ * ============LICENSE_END==========================================================================
*/
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
@@ -31,11 +37,11 @@ public abstract class MessageClient extends BaseHTTPClient {
protected final Map<String, String> headerMap;
private String notificationUri;
- protected enum SendMethod {
+ public /* protected */ enum SendMethod {
PUT, POST
}
- protected enum MessageType {
+ public /* protected */ enum MessageType {
xml, json
}
@@ -95,7 +101,7 @@ public abstract class MessageClient extends BaseHTTPClient {
try {
response = sendRequest(notificationUri, method.toString(), message, headerMap);
} catch (IOException e) {
- LOG.warn("Problem sending fault message: {}", e.getMessage());
+ LOG.warn("Problem sending message: {}", e.getMessage());
return false;
}
LOG.debug("Finished with response code {}", response.code);
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageConfig.java
deleted file mode 100644
index 8a6f6442e..000000000
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageConfig.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * ============LICENSE_START========================================================================
- * ONAP : ccsdk feature sdnr wt mountpoint-registrar
- * =================================================================================================
- * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
- * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
- * =================================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END==========================================================================
- */
-
-
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
-
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
-
-public abstract class MessageConfig implements Configuration {
- protected String sectionMarker;
-
- public static final String PROPERTY_KEY_CONSUMER_TRANSPORTTYPE = "TransportType";
- private static final String DEFAULT_VALUE_CONSUMER_TRANSPORTTYPE = "HTTPNOAUTH";
-
- public static final String PROPERTY_KEY_CONSUMER_PROTOCOL = "Protocol";
- private static final String DEFAULT_VALUE_CONSUMER_PROTOCOL = "http";
-
- public static final String PROPERTY_KEY_CONSUMER_USERNAME = "username";
- public static final String PROPERTY_KEY_CONSUMER_PASSWORD = "password";
-
- public static final String PROPERTY_KEY_CONSUMER_HOST_PORT = "host";
- private static final String DEFAULT_VALUE_CONSUMER_HOST_PORT = "onap-dmaap:3904";
-
- public static final String PROPERTY_KEY_CONSUMER_TOPIC = "topic";
-
- public static final String PROPERTY_KEY_CONSUMER_CONTENTTYPE = "contenttype";
- private static final String DEFAULT_VALUE_CONSUMER_CONTENTTYPE = "application/json";
-
- public static final String PROPERTY_KEY_CONSUMER_GROUP = "group";
- private static final String DEFAULT_VALUE_CONSUMER_GROUP = "myG";
-
- public static final String PROPERTY_KEY_CONSUMER_ID = "id";
- private static final String DEFAULT_VALUE_CONSUMER_ID = "C1";
-
- public static final String PROPERTY_KEY_CONSUMER_TIMEOUT = "timeout";
- private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "20000";
-
- public static final String PROPERTY_KEY_CONSUMER_LIMIT = "limit";
- private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "10000";
-
- public static final String PROPERTY_KEY_CONSUMER_FETCHPAUSE = "fetchPause";
- private static final String DEFAULT_VALUE_CONSUMER_FETCHPAUSE = "5000";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT = "jersey.config.client.readTimeout";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT = "25000";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT = "jersey.config.client.connectTimeout";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT = "25000";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER = "jersey.config.client.proxy.username";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER = "${HTTP_PROXY_USERNAME}";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD = "jersey.config.client.proxy.password";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD = "${HTTP_PROXY_PASSWORD}";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI = "jersey.config.client.proxy.uri";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_URI = "${HTTP_PROXY_URI}";
-
- protected ConfigurationFileRepresentation configuration;
-
- public MessageConfig(ConfigurationFileRepresentation configuration) {
- this.configuration = configuration;
- }
-
- @Override
- public String getSectionName() {
- return sectionMarker;
- }
-
- @Override
- public void defaults() {
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
- DEFAULT_VALUE_CONSUMER_TRANSPORTTYPE);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_PROTOCOL,
- DEFAULT_VALUE_CONSUMER_PROTOCOL);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_HOST_PORT,
- DEFAULT_VALUE_CONSUMER_HOST_PORT);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CONTENTTYPE,
- DEFAULT_VALUE_CONSUMER_CONTENTTYPE);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_GROUP,
- DEFAULT_VALUE_CONSUMER_GROUP);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_ID, DEFAULT_VALUE_CONSUMER_ID);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_TIMEOUT,
- DEFAULT_VALUE_CONSUMER_TIMEOUT);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_LIMIT,
- DEFAULT_VALUE_CONSUMER_LIMIT);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_FETCHPAUSE,
- DEFAULT_VALUE_CONSUMER_FETCHPAUSE);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_URI);
- }
-
-
-
- public String getHostPort() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_HOST_PORT);
- }
-
- public String getTransportType() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE);
- }
-
- public String getProtocol() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_PROTOCOL);
- }
-
- public String getUsername() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_USERNAME);
- }
-
- public String getPassword() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_PASSWORD);
- }
-
- public String getTopic() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TOPIC);
- }
-
- public String getConsumerGroup() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_GROUP);
- }
-
- public String getConsumerId() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_ID);
- }
-
- public String getTimeout() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TIMEOUT);
- }
-
- public String getLimit() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_LIMIT);
- }
-
- public String getFetchPause() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_FETCHPAUSE);
- }
-
- public String getContenttype() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CONTENTTYPE);
- }
-
- public String getClientReadTimeout() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT);
- }
-
- public String getClientConnectTimeout() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT);
- }
-
- public String getHTTPProxyURI() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI);
- }
-
- public String getHTTPProxyUsername() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER);
- }
-
- public String getHTTPProxyPassword() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD);
- }
-}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
index 136d2a12b..32d68ee62 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
@@ -24,6 +24,13 @@ import java.util.List;
import java.util.Map;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,12 +40,13 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
private static final String APPLICATION_NAME = "mountpoint-registrar";
private static final String CONFIGURATIONFILE = "etc/mountpoint-registrar.properties";
- private Thread dmaapVESMsgConsumerMain = null;
+ private Thread sKafkaVESMsgConsumerMain = null;
private GeneralConfig generalConfig;
- private boolean dmaapEnabled = false;
+ private boolean strimziEnabled = false;
private Map<String, MessageConfig> configMap = new HashMap<>();
- private DMaaPVESMsgConsumerMain dmaapConsumerMain = null;
+ private StrimziKafkaVESMsgConsumerMain sKafkaConsumerMain = null;
+ private StrimziKafkaConfig strimziKafkaConfig;
// Blueprint 1
public MountpointRegistrarImpl() {
@@ -53,22 +61,25 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
configFileRepresentation.registerConfigChangedListener(this);
generalConfig = new GeneralConfig(configFileRepresentation);
+ strimziKafkaConfig = new StrimziKafkaConfig(configFileRepresentation);
PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation);
FaultConfig faultConfig = new FaultConfig(configFileRepresentation);
ProvisioningConfig provisioningConfig = new ProvisioningConfig(configFileRepresentation);
+ StndDefinedFaultConfig stndFaultConfig = new StndDefinedFaultConfig(configFileRepresentation);
configMap.put("pnfRegistration", pnfRegConfig);
configMap.put("fault", faultConfig);
configMap.put("provisioning", provisioningConfig);
-
- dmaapEnabled = generalConfig.getEnabled();
- if (dmaapEnabled) { // start dmaap consumer thread only if dmaapEnabled=true
- LOG.info("DMaaP seems to be enabled, starting consumer(s)");
- dmaapConsumerMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig);
- dmaapVESMsgConsumerMain = new Thread(dmaapConsumerMain);
- dmaapVESMsgConsumerMain.start();
+ configMap.put("stndDefinedFault", stndFaultConfig);
+
+ strimziEnabled = strimziKafkaConfig.getEnabled();
+ if (strimziEnabled) { // start Kafka consumer thread only if strimziEnabled=true
+ LOG.info("Strimzi Kafka seems to be enabled, starting consumer(s)");
+ sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
+ sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
+ sKafkaVESMsgConsumerMain.start();
} else {
- LOG.info("DMaaP seems to be disabled, not starting any consumer(s)");
+ LOG.info("Strimzi Kafka seems to be disabled, not starting any consumer(s)");
}
}
@@ -83,26 +94,26 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
@Override
public void onConfigChanged() {
- if (generalConfig == null) { // Included as NullPointerException observed once in docker logs
- LOG.warn("onConfigChange cannot be handled. Unexpected Null");
- return;
- }
- LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled());
- boolean dmaapEnabledNewVal = generalConfig.getEnabled();
- if (!dmaapEnabled && dmaapEnabledNewVal) { // Dmaap disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
- LOG.info("DMaaP is enabled, starting consumer(s)");
- dmaapConsumerMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig);
- dmaapVESMsgConsumerMain = new Thread(dmaapConsumerMain);
- dmaapVESMsgConsumerMain.start();
- } else if (dmaapEnabled && !dmaapEnabledNewVal) { // Dmaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
- LOG.info("DMaaP is disabled, stopping consumer(s)");
- List<DMaaPVESMsgConsumer> consumers = dmaapConsumerMain.getConsumers();
- for (DMaaPVESMsgConsumer consumer : consumers) {
+ if (generalConfig == null) { // Included as NullPointerException observed once in docker logs
+ LOG.warn("onConfigChange cannot be handled. Unexpected Null");
+ return;
+ }
+ LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled());
+ boolean strimziEnabledNewVal = strimziKafkaConfig.getEnabled();
+ if (!strimziEnabled && strimziEnabledNewVal) { // Strimzi kafka disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
+ LOG.info("Strimzi Kafka is enabled, starting consumer(s)");
+ sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
+ sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
+ sKafkaVESMsgConsumerMain.start();
+ } else if (strimziEnabled && !strimziEnabledNewVal) { // Strimzi kafka enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
+ LOG.info("Strimzi Kafka is disabled, stopping consumer(s)");
+ List<StrimziKafkaVESMsgConsumer> consumers = sKafkaConsumerMain.getConsumers();
+ for (StrimziKafkaVESMsgConsumer consumer : consumers) {
// stop all consumers
consumer.stopConsumer();
}
}
- dmaapEnabled = dmaapEnabledNewVal;
+ strimziEnabled = strimziEnabledNewVal;
}
@Override
@@ -125,6 +136,4 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
}
}
}
-
-
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumer.java
index 2874c906f..2872384f1 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumer.java
@@ -20,9 +20,9 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
import java.util.Properties;
-public abstract interface DMaaPVESMsgConsumer extends Runnable {
+public abstract interface StrimziKafkaVESMsgConsumer extends Runnable {
- public abstract void init(Properties baseProperties);
+ public abstract void init(Properties strimziKafkaProperties, Properties properties);
public abstract void processMsg(String msg) throws Exception;//Implement something like InvalidMessageException;
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
index 34b8d4031..249eb612e 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
@@ -19,38 +19,38 @@
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
-import java.util.Properties;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.MRConsumer;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
+import java.util.List;
+import java.util.Properties;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DMaaPVESMsgValidator {
+public abstract class StrimziKafkaVESMsgConsumerImpl
+ implements StrimziKafkaVESMsgConsumer, StrimziKafkaVESMsgValidator {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerImpl.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaVESMsgConsumerImpl.class);
private static final String DEFAULT_SDNRUSER = "admin";
private static final String DEFAULT_SDNRPASSWD = "admin";
private final String name = this.getClass().getSimpleName();
- private Properties properties = null;
- private MRConsumer consumer = null;
+ private VESMsgKafkaConsumer consumer = null;
private boolean running = false;
private boolean ready = false;
private int fetchPause = 5000; // Default pause between fetch - 5 seconds
- private int timeout = 15000; // Default timeout - 15 seconds
protected final GeneralConfig generalConfig;
- protected DMaaPVESMsgConsumerImpl(GeneralConfig generalConfig) {
+ protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) {
this.generalConfig = generalConfig;
}
/*
- * Thread to fetch messages from the DMaaP topic. Waits for the messages to arrive on the topic until a certain timeout and returns.
- * If no data arrives on the topic, sleeps for a certain time period before checking again
+ * Thread to fetch messages from the Kafka topic. Waits for the messages to
+ * arrive on the topic until a certain timeout and returns. If no data arrives
+ * on the topic, sleeps for a certain time period before checking again
*/
@Override
public void run() {
@@ -60,35 +60,28 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM
while (running) {
try {
boolean noData = true;
- MRConsumerResponse consumerResponse = null;
- consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
- for (String msg : consumerResponse.getActualMessages()) {
+ List<String> consumerResponse = null;
+ consumerResponse = consumer.poll();
+ for (String msg : consumerResponse) {
noData = false;
- LOG.debug("{} received ActualMessage from DMaaP VES Message topic {}", name,msg);
- if(isMessageValid(msg)) {
+ LOG.debug("{} received ActualMessage from Kafka VES Message topic {}", name, msg);
+ if (isMessageValid(msg)) {
processMsg(msg);
}
}
if (noData) {
- LOG.debug("{} received ResponseCode: {}", name, consumerResponse.getResponseCode());
- LOG.debug("{} received ResponseMessage: {}", name, consumerResponse.getResponseMessage());
- if ((consumerResponse.getResponseCode() == null)
- && (consumerResponse.getResponseMessage().contains("SocketTimeoutException"))) {
- LOG.warn("Client timeout while waiting for response from Server {}",
- consumerResponse.getResponseMessage());
- }
pauseThread();
}
- } catch (InterruptedException e) {
- LOG.warn("Caught exception reading from DMaaP VES Message Topic", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Caught exception reading from Kafka Message Topic", e);
Thread.currentThread().interrupt();
} catch (JsonProcessingException jsonProcessingException) {
LOG.warn("Failed to convert message to JsonNode: {}", jsonProcessingException.getMessage());
} catch (InvalidMessageException invalidMessageException) {
LOG.warn("Message is invalid because of: {}", invalidMessageException.getMessage());
} catch (Exception e) {
- LOG.error("Caught exception reading from DMaaP VES Message Topic", e);
+ LOG.error("Caught exception reading from Kafka Message Topic", e);
running = false;
}
}
@@ -105,50 +98,19 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM
}
/*
- * Create a consumer by specifying properties containing information such as topic name, timeout, URL etc
+ * Create a Kafka consumer by specifying properties containing information such as
+ * topic name, timeout, URL etc
*/
@Override
- public void init(Properties properties) {
+ public void init(Properties strimziKafkaProperties, Properties consumerProperties) {
try {
-
- String timeoutStr = properties.getProperty("timeout");
- LOG.debug("timeoutStr: {}", timeoutStr);
-
- if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
- timeout = parseTimeOutValue(timeoutStr);
- }
-
- String fetchPauseStr = properties.getProperty("fetchPause");
- LOG.debug("fetchPause(Str): {}",fetchPauseStr);
- if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
- fetchPause = parseFetchPause(fetchPauseStr);
- }
- LOG.debug("fetchPause: {} ",fetchPause);
-
- this.consumer = MRClientFactory.createConsumer(properties);
+ this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties);
+ this.consumer.subscribe(consumerProperties.getProperty("topic"));
ready = true;
} catch (Exception e) {
- LOG.error("Error initializing DMaaP VES Message consumer from file {} {}",properties, e);
- }
- }
-
- private int parseTimeOutValue(String timeoutStr) {
- try {
- return Integer.parseInt(timeoutStr);
- } catch (NumberFormatException e) {
- LOG.error("Non-numeric value specified for timeout ({})",timeoutStr);
- }
- return timeout;
- }
-
- private int parseFetchPause(String fetchPauseStr) {
- try {
- return Integer.parseInt(fetchPauseStr);
- } catch (NumberFormatException e) {
- LOG.error("Non-numeric value specified for fetchPause ({})",fetchPauseStr);
+ LOG.error("Error initializing Kafka Message consumer from file {} {}", consumerProperties, e);
}
- return fetchPause;
}
private void pauseThread() throws InterruptedException {
@@ -170,16 +132,15 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM
return running;
}
- public String getProperty(String name) {
- return properties.getProperty(name, "");
- }
-
+ /*
+ * public String getProperty(String name) { return properties.getProperty(name,
+ * ""); }
+ */
@Override
public void stopConsumer() {
running = false;
}
-
public String getBaseUrl() {
return generalConfig.getBaseUrl();
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
index 3626f534a..03573d85b 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
@@ -23,34 +23,58 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.StrimziKafkaCMVESMsgConsumer;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.StrimziKafkaFaultVESMsgConsumer;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg.StrimziKafkaPNFRegVESMsgConsumer;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.stnddefined.StrimziKafkaStndDefinedFaultVESMsgConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DMaaPVESMsgConsumerMain implements Runnable {
+public class StrimziKafkaVESMsgConsumerMain implements Runnable {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaVESMsgConsumerMain.class);
+ Properties strimziKafkaProperties = new Properties();
private static final String _PNFREG_CLASS =
"org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
private static final String _FAULT_CLASS =
"org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
private static final String _CM_CLASS =
"org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer";
+ private static final String _STNDDEFINED_FAULT_CLASS =
+ "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPStndDefinedFaultVESMsgConsumer";
private static final String _PNFREG_DOMAIN = "pnfRegistration";
private static final String _FAULT_DOMAIN = "fault";
private static final String _CM_DOMAIN = "provisioning";
+ private static final String _STNDDEFINED_FAULT_DOMAIN = "stndDefinedFault";
boolean threadsRunning = false;
- List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
+ List<StrimziKafkaVESMsgConsumer> consumers = new LinkedList<>();
private PNFRegistrationConfig pnfRegistrationConfig;
private FaultConfig faultConfig;
private GeneralConfig generalConfig;
private ProvisioningConfig provisioningConfig;
+ private StndDefinedFaultConfig stndDefinedFaultConfig;
+ private StrimziKafkaConfig strimziKafkaConfig;
- public DMaaPVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
+ public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
this.generalConfig = generalConfig;
configMap.forEach(this::initialize);
}
+ public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig,
+ StrimziKafkaConfig strimziKafkaConfig) {
+ this.generalConfig = generalConfig;
+ this.strimziKafkaConfig = strimziKafkaConfig;
+ configMap.forEach(this::initialize);
+ }
+
public void initialize(String domain, MessageConfig domainConfig) {
LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
String consumerClass;
@@ -60,12 +84,6 @@ public class DMaaPVESMsgConsumerMain implements Runnable {
consumerClass = _PNFREG_CLASS;
LOG.debug("Consumer class = {}", consumerClass);
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
- pnfRegistrationConfig.getTransportType());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
- pnfRegistrationConfig.getHostPort());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
- pnfRegistrationConfig.getContenttype());
consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
pnfRegistrationConfig.getConsumerGroup());
consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
@@ -76,61 +94,26 @@ public class DMaaPVESMsgConsumerMain implements Runnable {
consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
pnfRegistrationConfig.getFetchPause());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL,
- pnfRegistrationConfig.getProtocol());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME,
- pnfRegistrationConfig.getUsername());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD,
- pnfRegistrationConfig.getPassword());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- pnfRegistrationConfig.getClientReadTimeout());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- pnfRegistrationConfig.getClientConnectTimeout());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- pnfRegistrationConfig.getHTTPProxyURI());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- pnfRegistrationConfig.getHTTPProxyUsername());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- pnfRegistrationConfig.getHTTPProxyPassword());
-
- threadsRunning = createConsumer(_PNFREG_DOMAIN, consumerProperties);
+
+ threadsRunning =
+ createConsumer(_PNFREG_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
} else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) {
this.faultConfig = (FaultConfig) domainConfig;
consumerClass = _FAULT_CLASS;
LOG.debug("Consumer class = {}", consumerClass);
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- faultConfig.getClientReadTimeout());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- faultConfig.getClientConnectTimeout());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- faultConfig.getHTTPProxyURI());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- faultConfig.getHTTPProxyUsername());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- faultConfig.getHTTPProxyPassword());
- threadsRunning = createConsumer(_FAULT_DOMAIN, consumerProperties);
+
+ threadsRunning =
+ createConsumer(_FAULT_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
} else if (domain.equalsIgnoreCase(_CM_DOMAIN)) {
this.provisioningConfig = (ProvisioningConfig) domainConfig;
consumerClass = _CM_CLASS;
LOG.debug("Consumer class = {}", consumerClass);
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
- provisioningConfig.getTransportType());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
- provisioningConfig.getHostPort());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
- provisioningConfig.getContenttype());
consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP,
provisioningConfig.getConsumerGroup());
consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId());
@@ -139,26 +122,43 @@ public class DMaaPVESMsgConsumerMain implements Runnable {
consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit());
consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
provisioningConfig.getFetchPause());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, provisioningConfig.getProtocol());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_USERNAME, provisioningConfig.getUsername());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PASSWORD, provisioningConfig.getPassword());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- provisioningConfig.getClientReadTimeout());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- provisioningConfig.getClientConnectTimeout());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- provisioningConfig.getHTTPProxyURI());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- provisioningConfig.getHTTPProxyUsername());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- provisioningConfig.getHTTPProxyPassword());
- threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties);
+
+ threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
+ } else if (domain.equalsIgnoreCase(_STNDDEFINED_FAULT_DOMAIN)) {
+ this.stndDefinedFaultConfig = (StndDefinedFaultConfig) domainConfig;
+ consumerClass = _STNDDEFINED_FAULT_CLASS;
+ LOG.debug("Consumer class = {}", consumerClass);
+ consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_GROUP,
+ stndDefinedFaultConfig.getConsumerGroup());
+ consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_ID,
+ stndDefinedFaultConfig.getConsumerId());
+ consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TOPIC,
+ stndDefinedFaultConfig.getTopic());
+ consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
+ stndDefinedFaultConfig.getTimeout());
+ consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_LIMIT,
+ stndDefinedFaultConfig.getLimit());
+ consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
+ stndDefinedFaultConfig.getFetchPause());
+
+ threadsRunning = createConsumer(_STNDDEFINED_FAULT_DOMAIN, consumerProperties,
+ getStrimziKafkaProps(strimziKafkaConfig));
+ }
+ }
+
+ private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) {
+ if (strimziKafkaProperties.size() == 0) {
+ strimziKafkaProperties.put("bootstrapServers", strimziKafkaConfig.getBootstrapServers());
+ strimziKafkaProperties.put("securityProtocol", strimziKafkaConfig.getSecurityProtocol());
+ strimziKafkaProperties.put("saslMechanism", strimziKafkaConfig.getSaslMechanism());
+ strimziKafkaProperties.put("saslJaasConfig", strimziKafkaConfig.getSaslJaasConfig());
}
+ return strimziKafkaProperties;
}
- private boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
+ private boolean updateThreadState(List<StrimziKafkaVESMsgConsumer> consumers) {
boolean threadsRunning = false;
- for (DMaaPVESMsgConsumer consumer : consumers) {
+ for (StrimziKafkaVESMsgConsumer consumer : consumers) {
if (consumer.isRunning()) {
threadsRunning = true;
}
@@ -166,31 +166,33 @@ public class DMaaPVESMsgConsumerMain implements Runnable {
return threadsRunning;
}
- public boolean createConsumer(String consumerType, Properties properties) {
- DMaaPVESMsgConsumerImpl consumer = null;
+ public boolean createConsumer(String consumerType, Properties consumerProperties, Properties strimziKafkaProps) {
+ StrimziKafkaVESMsgConsumerImpl consumer = null;
if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
- consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig);
else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
- consumer = new DMaaPFaultVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig);
else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
- consumer = new DMaaPCMVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig);
+ else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN))
+ consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig);
- handleConsumer(consumer, properties, consumers);
+ handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers);
return !consumers.isEmpty();
}
- private boolean handleConsumer(DMaaPVESMsgConsumer consumer, Properties properties,
- List<DMaaPVESMsgConsumer> consumers) {
+ private boolean handleConsumer(StrimziKafkaVESMsgConsumer consumer, Properties consumerProperties,
+ Properties strimziKafkaProps, List<StrimziKafkaVESMsgConsumer> consumers) {
if (consumer != null) {
- consumer.init(properties);
+ consumer.init(strimziKafkaProps, consumerProperties);
if (consumer.isReady()) {
Thread consumerThread = new Thread(consumer);
consumerThread.start();
consumers.add(consumer);
- LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), properties);
+ LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), consumerProperties);
return true;
} else {
LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
@@ -218,7 +220,7 @@ public class DMaaPVESMsgConsumerMain implements Runnable {
LOG.info("No listener threads running - exiting");
}
- public List<DMaaPVESMsgConsumer> getConsumers() {
+ public List<StrimziKafkaVESMsgConsumer> getConsumers() {
return consumers;
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgValidator.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgValidator.java
index 0532334ef..b1bd2fca5 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgValidator.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgValidator.java
@@ -18,7 +18,7 @@
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
-public interface DMaaPVESMsgValidator {
+public interface StrimziKafkaVESMsgValidator {
boolean isMessageValid(String message);
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java
new file mode 100644
index 000000000..b8dee44b0
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java
@@ -0,0 +1,81 @@
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that provides a KafkaConsumer to communicate with a kafka cluster
+ */
+public class VESMsgKafkaConsumer {
+
+ private static final Logger log = LoggerFactory.getLogger(VESMsgKafkaConsumer.class);
+ final KafkaConsumer<String, String> consumer;
+ private final int pollTimeout;
+ private String topicName;
+ private static final String DESERIALIZER_CLASS = "org.apache.kafka.common.serialization.StringDeserializer";
+
+ /**
+ *
+ * @param consumerProperties
+ * @param configuration The config provided to the client
+ */
+ public VESMsgKafkaConsumer(Properties strimziKafkaProperties, Properties consumerProperties) {
+ Properties props = new Properties();
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers"));
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("securityProtocol"));
+ props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("saslMechanism"));
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("saslJaasConfig"));
+ props.put(ConsumerConfig.GROUP_ID_CONFIG,
+ consumerProperties.getProperty("topic") + "-" + consumerProperties.getProperty("consumerGroup"));
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG,
+ consumerProperties.getProperty("topic") + "-" + consumerProperties.getProperty("consumerID"));
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DESERIALIZER_CLASS);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DESERIALIZER_CLASS);
+ consumer = new KafkaConsumer<>(props);
+ pollTimeout = Integer.parseInt(consumerProperties.getProperty("timeout"));
+ }
+
+ /**
+ *
+ * @param topic The kafka topic to subscribe to
+ */
+ public void subscribe(String topic) {
+ try {
+ consumer.subscribe(Collections.singleton(topic));
+ this.topicName = topic;
+ } catch (InvalidGroupIdException e) {
+ log.error("Invalid Group {}", e.getMessage());
+ }
+ }
+
+ /**
+ *
+ * @return The list of records returned from the poll
+ */
+ public List<String> poll() {
+ List<String> msgs = new ArrayList<>();
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(pollTimeout));
+ for (ConsumerRecord<String, String> rec : records) {
+ msgs.add(rec.value());
+ }
+ return msgs;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMBasicHeaderFieldsNotification.java
index 98f02ec7a..9a01d53d1 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMBasicHeaderFieldsNotification.java
@@ -15,7 +15,7 @@
* the License.
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm;
public class CMBasicHeaderFieldsNotification {
private String cmNodeId;
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotification.java
index 014ff648d..8de9a21d0 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotification.java
@@ -15,7 +15,7 @@
* the License.
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.CmOperation;
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotificationClient.java
index 115f0f0c0..bdd83cce9 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotificationClient.java
@@ -16,7 +16,7 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm;
import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.SendMethod.POST;
@@ -24,6 +24,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.MessageType;
+
public class CMNotificationClient extends MessageClient {
private static final String CM_NOTIFICATION_URI = "rests/operations/devicemanager:push-cm-notification";
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java
index 8412e3730..c32d16273 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java
@@ -16,23 +16,27 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Iterator;
import java.util.Map;
+
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
+public class StrimziKafkaCMVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPCMVESMsgConsumer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaCMVESMsgConsumer.class);
- public DMaaPCMVESMsgConsumer(GeneralConfig generalConfig) {
+ public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig) {
super(generalConfig);
- LOG.info("DMaaPCMVESMsgConsumer started successfully");
+ LOG.info("StrimziKafkaCMVESMsgConsumer started successfully");
}
@Override
@@ -45,16 +49,16 @@ public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
String notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue();
if (notificationType.equalsIgnoreCase("notifyMOIChanges")) {
- LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", cmNodeId);
+ LOG.info("Read CM message from Kafka topic that is moiChanges type with id {}", cmNodeId);
processMoiChanges(rootNode);
} else if (notificationType.equalsIgnoreCase("notifyMOICreation")) {
- LOG.info("Read CM message from DMaaP topic that is moiCreation type with id {}", cmNodeId);
+ LOG.info("Read CM message from Kafka topic that is moiCreation type with id {}", cmNodeId);
sendCMNotification(preparePayloadMapFromMoi(rootNode, "/event/stndDefinedFields/data/attributeList"));
} else if (notificationType.equalsIgnoreCase("notifyMOIDeletion")) {
- LOG.info("Read CM message from DMaaP topic that is moiDeletion type with id {}", cmNodeId);
+ LOG.info("Read CM message from Kafka topic that is moiDeletion type with id {}", cmNodeId);
sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList"));
} else if (notificationType.equalsIgnoreCase("notifyMOIAttributeValueChanges")) {
- LOG.info("Read CM message from DMaaP topic that is moiAttributeValueChanges type with id {}", cmNodeId);
+ LOG.info("Read CM message from Kafka topic that is moiAttributeValueChanges type with id {}", cmNodeId);
sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges"));
} else {
LOG.warn("Message is invalid, sending aborted, wrong CM notification type {}", notificationType);
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultNotificationClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/FaultNotificationClient.java
index ce2538628..60a241831 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultNotificationClient.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/FaultNotificationClient.java
@@ -17,12 +17,14 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient;
+
import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.MessageType.*;
import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.SendMethod.*;
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java
index ee9d0220a..dc65732b4 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPFaultVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java
@@ -17,7 +17,7 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -25,15 +25,18 @@ import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Map;
+
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DMaaPFaultVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
+public class StrimziKafkaFaultVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPFaultVESMsgConsumer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaFaultVESMsgConsumer.class);
- public DMaaPFaultVESMsgConsumer(GeneralConfig generalConfig) {
+ public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig) {
super(generalConfig);
}
@@ -48,32 +51,32 @@ public class DMaaPFaultVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
int faultSequence;
String reportingEntityName;
ObjectMapper oMapper = new ObjectMapper();
- JsonNode dmaapMessageRootNode;
+ JsonNode sKafkaMessageRootNode;
LOG.info("Fault VES Message is - {}", msg);
try {
- dmaapMessageRootNode = oMapper.readTree(msg);
- reportingEntityName = dmaapMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
+ sKafkaMessageRootNode = oMapper.readTree(msg);
+ reportingEntityName = sKafkaMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
if (reportingEntityName.equals("ONAP SDN-R")) {
LOG.info(
- "VES PNF Registration message generated by SDNR, hence no need to process any further; Ignoring the received message");
+ "VES Fault message generated by SDNR, hence no need to process any further; Ignoring the received message");
return;
}
- vesDomain = dmaapMessageRootNode.at("/event/commonEventHeader/domain").textValue();
+ vesDomain = sKafkaMessageRootNode.at("/event/commonEventHeader/domain").textValue();
if (!vesDomain.equalsIgnoreCase("fault")) {
- LOG.warn("Received {} domain VES Message in DMaaP Fault topic, ignoring it", vesDomain);
+ LOG.warn("Received {} domain VES Message in Kafka Fault topic, ignoring it", vesDomain);
return;
}
- faultNodeId = dmaapMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
+ faultNodeId = sKafkaMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
faultOccurrenceTime = Instant
.ofEpochMilli(
- dmaapMessageRootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
+ sKafkaMessageRootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
.atZone(ZoneId.of("Z")).toString();
- faultObjectId = dmaapMessageRootNode.at("/event/faultFields/alarmInterfaceA").textValue();
- faultReason = dmaapMessageRootNode.at("/event/faultFields/specificProblem").textValue();
- faultSeverity = dmaapMessageRootNode.at("/event/faultFields/eventSeverity").textValue();
- faultSequence = dmaapMessageRootNode.at("/event/commonEventHeader/sequence").intValue();
+ faultObjectId = sKafkaMessageRootNode.at("/event/faultFields/alarmInterfaceA").textValue();
+ faultReason = sKafkaMessageRootNode.at("/event/faultFields/specificProblem").textValue();
+ faultSeverity = sKafkaMessageRootNode.at("/event/faultFields/eventSeverity").textValue();
+ faultSequence = sKafkaMessageRootNode.at("/event/commonEventHeader/sequence").intValue();
if (faultSeverity.equalsIgnoreCase("critical")) {
faultSeverity = SeverityType.Critical.toString();
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFMountPointClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/PNFMountPointClient.java
index fd31a3fd6..70402a878 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFMountPointClient.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/PNFMountPointClient.java
@@ -17,7 +17,7 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg;
import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.MessageType.xml;
import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.SendMethod.PUT;
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import org.eclipse.jdt.annotation.NonNull;
import org.onap.ccsdk.features.sdnr.wt.common.database.requests.BaseRequest;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient;
public class PNFMountPointClient extends MessageClient {
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPPNFRegVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java
index 51d6d1950..147202fb8 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPPNFRegVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java
@@ -17,32 +17,34 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import org.eclipse.jdt.annotation.Nullable;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DMaaPPNFRegVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
+public class StrimziKafkaPNFRegVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPPNFRegVESMsgConsumer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaPNFRegVESMsgConsumer.class);
private static final String DEFAULT_PROTOCOL = "SSH";
private static final String DEFAULT_PORT = "17830";
private static final String DEFAULT_USERNAME = "netconf";
private static final String DEFAULT_PASSWORD = "netconf";
- public DMaaPPNFRegVESMsgConsumer(GeneralConfig generalConfig) {
+ public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig) {
super(generalConfig);
}
@Override
public void processMsg(String msg) {
- LOG.debug("Message from DMaaP topic is - {} ", msg);
+ LOG.debug("Message from Kafka topic is - {} ", msg);
String pnfId;
String pnfIPAddress;
@Nullable
@@ -57,33 +59,33 @@ public class DMaaPPNFRegVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
String pnfPasswd = null;
String reportingEntityName;
ObjectMapper oMapper = new ObjectMapper();
- JsonNode dmaapMessageRootNode;
+ JsonNode sKafkaMessageRootNode;
try {
- dmaapMessageRootNode = oMapper.readTree(msg);
- reportingEntityName = dmaapMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
+ sKafkaMessageRootNode = oMapper.readTree(msg);
+ reportingEntityName = sKafkaMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
if (reportingEntityName.equals("ONAP SDN-R")) {
LOG.info(
"VES PNF Registration message generated by SDNR, hence no need to process any further; Ignoring the received message");
return;
}
- pnfId = dmaapMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
- pnfIPAddress = getPNFIPAddress(dmaapMessageRootNode);
+ pnfId = sKafkaMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
+ pnfIPAddress = getPNFIPAddress(sKafkaMessageRootNode);
pnfCommProtocol =
- dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/protocol").textValue();
- pnfCommPort = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/oamPort").textValue();
+ sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/protocol").textValue();
+ pnfCommPort = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/oamPort").textValue();
if (pnfCommProtocol != null) {
if (pnfCommProtocol.equalsIgnoreCase("TLS")) {
// Read username and keyId
pnfKeyId =
- dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/keyId").textValue();
- pnfUsername = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username")
+ sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/keyId").textValue();
+ pnfUsername = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username")
.textValue();
} else if (pnfCommProtocol.equalsIgnoreCase("SSH")) {
// Read username and password
- pnfUsername = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username")
+ pnfUsername = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username")
.textValue();
- pnfPasswd = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/password")
+ pnfPasswd = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/password")
.textValue();
} else {
// log warning - Unknown protocol
@@ -139,12 +141,12 @@ public class DMaaPPNFRegVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
pnfCommPort == null || pnfUsername == null;
}
- private String getPNFIPAddress(JsonNode dmaapMessageRootNode) {
- String ipAddress = dmaapMessageRootNode.at("/event/pnfRegistrationFields/oamV6IpAddress").textValue();
+ private String getPNFIPAddress(JsonNode sKafkaMessageRootNode) {
+ String ipAddress = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/oamV6IpAddress").textValue();
if (ipAddress != null && ipAddress != "")
return ipAddress;
- ipAddress = dmaapMessageRootNode.at("/event/pnfRegistrationFields/oamV4IpAddress").textValue();
+ ipAddress = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/oamV4IpAddress").textValue();
if (ipAddress != null && ipAddress != "")
return ipAddress;
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java
new file mode 100644
index 000000000..648809722
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java
@@ -0,0 +1,141 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2022 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.stnddefined;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.FaultNotificationClient;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StrimziKafkaStndDefinedFaultVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaStndDefinedFaultVESMsgConsumer.class);
+ Map<String, String> payloadMapMessage = null;
+ String faultNodeId;
+ String notificationType;
+
+ public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig) {
+ super(generalConfig);
+ LOG.info("StrimziKafkaStndDefinedFaultVESMsgConsumer started successfully");
+ }
+
+ /*
+ * Supports processing of notifyNewAlarm and notifyClearedAlarm messages ONLY
+ */
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException {
+ LOG.debug("Processing StndDefined Fault message {}", msg);
+ JsonNode rootNode = convertMessageToJsonNode(msg);
+ try {
+
+ faultNodeId = rootNode.at("/event/commonEventHeader/sourceName").textValue();
+ notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue();
+
+ if (notificationType.equalsIgnoreCase("notifyNewAlarm")) {
+ LOG.info("Read stndDefined Fault message of type - {} with id {} from Kafka topic", notificationType,
+ faultNodeId);
+ processNewAlarm(rootNode);
+ } else if (notificationType.equalsIgnoreCase("notifyClearedAlarm")) {
+ LOG.info("Read stdnDefined Fault message of type - {} with id {} from Kafka topic", notificationType,
+ faultNodeId);
+ processClearedAlarm(rootNode);
+ } else {
+ LOG.warn(
+ "Read stdnDefined Fault message of type - {} with id {} from Kafka topic. No suitable implementation for processing this message",
+ notificationType, faultNodeId);
+ throw new InvalidMessageException();
+ }
+ // Send Fault Notification
+ String baseUrl = getBaseUrl();
+ String sdnrUser = getSDNRUser();
+ String sdnrPasswd = getSDNRPasswd();
+
+ FaultNotificationClient faultClient = new FaultNotificationClient(baseUrl);
+ LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
+ faultClient.setAuthorization(sdnrUser, sdnrPasswd);
+ String message = faultClient.prepareMessageFromPayloadMap(payloadMapMessage);
+ faultClient.sendNotification(message);
+ } catch (NullPointerException e) {
+ LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing");
+ throw new InvalidMessageException("Missing field");
+ }
+
+ }
+
+ private void processClearedAlarm(JsonNode rootNode) {
+ String faultOccurrenceTime =
+ Instant.ofEpochMilli(rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
+ .atZone(ZoneId.of("Z")).toString();
+ int faultSequence = rootNode.at("/event/commonEventHeader/sequence").intValue();
+ String faultObjectId = rootNode.at("/event/stndDefinedFields/data/alarmId").textValue();
+ String faultReason = rootNode.at("/event/stndDefinedFields/data/specificProblem").textValue();
+ String faultSeverity =
+ getSDNRSeverityType(rootNode.at("/event/stndDefinedFields/data/perceivedSeverity").textValue());
+
+ payloadMapMessage = FaultNotificationClient.createFaultNotificationPayloadMap(faultNodeId,
+ Integer.toString(faultSequence), faultOccurrenceTime, faultObjectId, faultReason, faultSeverity);
+
+ }
+
+ private void processNewAlarm(JsonNode rootNode) {
+ String faultOccurrenceTime =
+ Instant.ofEpochMilli(rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
+ .atZone(ZoneId.of("Z")).toString();
+ int faultSequence = rootNode.at("/event/commonEventHeader/sequence").intValue();
+ String faultObjectId = rootNode.at("/event/stndDefinedFields/data/alarmId").textValue();
+ String faultReason = rootNode.at("/event/stndDefinedFields/data/specificProblem").textValue();
+ String faultSeverity =
+ getSDNRSeverityType(rootNode.at("/event/stndDefinedFields/data/perceivedSeverity").textValue());
+
+ payloadMapMessage = FaultNotificationClient.createFaultNotificationPayloadMap(faultNodeId,
+ Integer.toString(faultSequence), faultOccurrenceTime, faultObjectId, faultReason, faultSeverity);
+
+ }
+
+ /*
+ * 3GPP Definition PerceivedSeverity: type: string enum: - INDETERMINATE -
+ * CRITICAL - MAJOR - MINOR - WARNING - CLEARED
+ *
+ */
+ private String getSDNRSeverityType(String faultSeverity) {
+ if (faultSeverity.equalsIgnoreCase("critical")) {
+ faultSeverity = SeverityType.Critical.toString();
+ } else if (faultSeverity.equalsIgnoreCase("major")) {
+ faultSeverity = SeverityType.Major.toString();
+ } else if (faultSeverity.equalsIgnoreCase("minor")) {
+ faultSeverity = SeverityType.Minor.toString();
+ } else if (faultSeverity.equalsIgnoreCase("warning") || faultSeverity.equalsIgnoreCase("indeterminate")) {
+ faultSeverity = SeverityType.Warning.toString();
+ } else if (faultSeverity.equalsIgnoreCase("cleared")) {
+ faultSeverity = SeverityType.NonAlarmed.toString();
+ } else {
+ faultSeverity = SeverityType.NonAlarmed.toString();
+ }
+ return faultSeverity;
+ }
+
+}