aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDan Timoney <dtimoney@att.com>2023-03-15 13:40:53 +0000
committerGerrit Code Review <gerrit@onap.org>2023-03-15 13:40:53 +0000
commit2544e565c0a46cefca89d039c0f887b278e48e3b (patch)
treeeca9f7f2c8b5da6b7a623287737f5c6caa40e8fd
parenta958c2c5fefbc331af3d233e33afeb3b9d168427 (diff)
parent3323a01bc3633dd723c1c7e9ad9488f89029bd1f (diff)
Merge "Use Strimzi Kafka and Kafka native APIs"
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/pom.xml27
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/FaultConfig.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultConfig.java)8
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/GeneralConfig.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/GeneralConfig.java)15
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java95
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/PNFRegistrationConfig.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFRegistrationConfig.java)8
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/ProvisioningConfig.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java)8
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StndDefinedFaultConfig.java37
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StrimziKafkaConfig.java98
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java12
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageConfig.java187
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java67
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumer.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumer.java)4
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java)99
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java)158
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgValidator.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgValidator.java)2
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java81
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMBasicHeaderFieldsNotification.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java)2
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotification.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java)2
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotificationClient.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java)5
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java)22
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/FaultNotificationClient.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultNotificationClient.java)4
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPFaultVESMsgConsumer.java)35
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/PNFMountPointClient.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFMountPointClient.java)3
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPPNFRegVESMsgConsumer.java)40
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java141
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestCMBasicHeaderFieldsNotification.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMBasicHeaderFieldsNotification.java)4
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestCMNotificationBuilder.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationBuilder.java)6
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestCMNotificationClient.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationClient.java)8
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestFaultNotificationClient.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestFaultNotificationClient.java)4
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestPNFMountPointClient.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestPNFMountPointClient.java)4
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/GeneralConfigForTest.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/GeneralConfigForTest.java)7
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/PNFRegistrationConfigTest.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/PNFRegistrationConfigTest.java)31
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestFaultConfig.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestFaultConfig.java)36
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestGeneralConfig.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestGeneralConfig.java)5
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestProvisioningConfig.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestProvisioningConfig.java)38
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestStrimziKafkaConfig.java73
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaCMVESMsgConsumer.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPCMVESMsgConsumer.java)68
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaFaultVESMsgConsumer.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPFaultVESMsgConsumer.java)10
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaPNFRegVESMsgConsumer.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPPNFRegVESMsgConsumer.java)11
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaStndDefinedVESMsgConsumer.java239
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaVESMsgConsumerMain.java (renamed from sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPVESMsgConsumerMain.java)25
41 files changed, 1105 insertions, 624 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/pom.xml b/sdnr/wt/mountpoint-registrar/provider/pom.xml
index 5cfd49bda..6f1676e9e 100644
--- a/sdnr/wt/mountpoint-registrar/provider/pom.xml
+++ b/sdnr/wt/mountpoint-registrar/provider/pom.xml
@@ -22,7 +22,9 @@
~ ============LICENSE_END=======================================================
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -48,10 +50,14 @@
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
- <buildtime>${maven.build.timestamp} UTC</buildtime>
+ <buildtime>${maven.build.timestamp} UTC</buildtime>
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>sdnr-wt-mountpoint-registrar-model</artifactId>
@@ -90,6 +96,11 @@
<artifactId>rfc6991-ietf-yang-types</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>jakarta.servlet</groupId>
+ <artifactId>jakarta.servlet-api</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.opendaylight.mdsal</groupId>
<artifactId>yang-binding</artifactId>
@@ -106,14 +117,10 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId>
- <artifactId>dmaapClient</artifactId>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>annotations</artifactId>
+ <scope>provided</scope>
</dependency>
- <dependency>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>annotations</artifactId>
- <scope>provided</scope>
- </dependency>
<!-- begin for testing -->
<dependency>
<groupId>org.mockito</groupId>
@@ -121,7 +128,7 @@
<scope>test</scope>
</dependency>
<!-- end for testing -->
-
+
</dependencies>
<build>
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/FaultConfig.java
index 7c71f7ee7..42180bd44 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/FaultConfig.java
@@ -16,15 +16,13 @@
* the License.
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
public class FaultConfig extends MessageConfig {
private static final String SECTION_MARKER = "fault";
- private static final String DEFAULT_VALUE_CONSUMER_USERNAME = "${DMAAP_FAULT_TOPIC_USERNAME}";
- private static final String DEFAULT_VALUE_CONSUMER_PASSWORD = "${DMAAP_FAULT_TOPIC_PASSWORD}";
private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.SEC_FAULT_OUTPUT";
@@ -32,10 +30,6 @@ public class FaultConfig extends MessageConfig {
super(configuration);
sectionMarker = SECTION_MARKER;
super.configuration.addSection(SECTION_MARKER);
- super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME,
- DEFAULT_VALUE_CONSUMER_USERNAME);
- super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD,
- DEFAULT_VALUE_CONSUMER_PASSWORD);
super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC,
DEFAULT_VALUE_CONSUMER_TOPIC);
defaults();
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/GeneralConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/GeneralConfig.java
index eec4e7a9e..a8f920497 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/GeneralConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/GeneralConfig.java
@@ -15,25 +15,18 @@
* the License.
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
/**
* Configuration of mountpoint-registrar, general section<br>
- * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not Generates default
- * Configuration properties if none exist or exist partially Generates Consumer properties only for
- * TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not
- * generated by default. For a list of applicable properties for the different TranportType values, please see -
- * https://wiki.onap.org/display/DW/Feature+configuration+requirements
*/
public class GeneralConfig implements Configuration {
private static final String SECTION_MARKER = "general";
- private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled";
-
private static final String PROPERTY_KEY_USER = "sdnrUser";
private static final String DEFAULT_VALUE_USER = "${SDNRUSERNAME}";
@@ -52,10 +45,6 @@ public class GeneralConfig implements Configuration {
defaults();
}
- public Boolean getEnabled() {
- return configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED);
- }
-
public String getBaseUrl() {
return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_BASEURL);
}
@@ -75,8 +64,6 @@ public class GeneralConfig implements Configuration {
@Override
public void defaults() {
- // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE);
configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_BASEURL, DEFAULT_VALUE_BASEURL);
configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_USER, DEFAULT_VALUE_USER);
configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_USERPASSWD, DEFAULT_VALUE_USERPASSWD);
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java
new file mode 100644
index 000000000..3b3394454
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java
@@ -0,0 +1,95 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
+
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
+
+
+public abstract class MessageConfig implements Configuration {
+ protected String sectionMarker;
+
+ public static final String PROPERTY_KEY_CONSUMER_TOPIC = "topic";
+
+ public static final String PROPERTY_KEY_CONSUMER_GROUP = "consumerGroup";
+ private static final String DEFAULT_VALUE_CONSUMER_GROUP = "myG";
+
+ public static final String PROPERTY_KEY_CONSUMER_ID = "consumerID";
+ private static final String DEFAULT_VALUE_CONSUMER_ID = "C1";
+
+ public static final String PROPERTY_KEY_CONSUMER_TIMEOUT = "timeout";
+ private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "20000";
+
+ public static final String PROPERTY_KEY_CONSUMER_LIMIT = "limit";
+ private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "10000";
+
+ public static final String PROPERTY_KEY_CONSUMER_FETCHPAUSE = "fetchPause";
+ private static final String DEFAULT_VALUE_CONSUMER_FETCHPAUSE = "5000";
+
+ protected ConfigurationFileRepresentation configuration;
+
+ public MessageConfig(ConfigurationFileRepresentation configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public String getSectionName() {
+ return sectionMarker;
+ }
+
+ @Override
+ public void defaults() {
+ configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_GROUP,
+ DEFAULT_VALUE_CONSUMER_GROUP);
+ configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_ID, DEFAULT_VALUE_CONSUMER_ID);
+ configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_TIMEOUT,
+ DEFAULT_VALUE_CONSUMER_TIMEOUT);
+ configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_LIMIT,
+ DEFAULT_VALUE_CONSUMER_LIMIT);
+ configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_FETCHPAUSE,
+ DEFAULT_VALUE_CONSUMER_FETCHPAUSE);
+ }
+
+ public String getTopic() {
+ return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TOPIC);
+ }
+
+ public String getConsumerGroup() {
+ return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_GROUP);
+ }
+
+ public String getConsumerId() {
+ return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_ID);
+ }
+
+ public String getTimeout() {
+ return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TIMEOUT);
+ }
+
+ public String getLimit() {
+ return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_LIMIT);
+ }
+
+ public String getFetchPause() {
+ return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_FETCHPAUSE);
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFRegistrationConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/PNFRegistrationConfig.java
index acc0b3a89..a2292f576 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFRegistrationConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/PNFRegistrationConfig.java
@@ -16,15 +16,13 @@
* the License.
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
public class PNFRegistrationConfig extends MessageConfig {
private static final String SECTION_MARKER = "pnfRegistration";
- private static final String DEFAULT_VALUE_CONSUMER_USERNAME = "${DMAAP_PNFREG_TOPIC_USERNAME}";
- private static final String DEFAULT_VALUE_CONSUMER_PASSWORD = "${DMAAP_PNFREG_TOPIC_PASSWORD}";
private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.VES_PNFREG_OUTPUT";
@@ -32,10 +30,6 @@ public class PNFRegistrationConfig extends MessageConfig {
super(configuration);
sectionMarker = SECTION_MARKER;
super.configuration.addSection(SECTION_MARKER);
- super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME,
- DEFAULT_VALUE_CONSUMER_USERNAME);
- super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD,
- DEFAULT_VALUE_CONSUMER_PASSWORD);
super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC,
DEFAULT_VALUE_CONSUMER_TOPIC);
defaults();
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/ProvisioningConfig.java
index 91a1f3fbe..a2119dbc5 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/ProvisioningConfig.java
@@ -16,24 +16,18 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
public class ProvisioningConfig extends MessageConfig {
private static final String SECTION_MARKER = "provisioning";
- private static final String DEFAULT_VALUE_CONSUMER_USERNAME = "${DMAAP_CM_TOPIC_USERNAME}";
- private static final String DEFAULT_VALUE_CONSUMER_PASSWORD = "${DMAAP_CM_TOPIC_PASSWORD}";
private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.SEC_3GPP_PROVISIONING_OUTPUT";
public ProvisioningConfig(ConfigurationFileRepresentation configuration) {
super(configuration);
sectionMarker = SECTION_MARKER;
super.configuration.addSection(SECTION_MARKER);
- super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME,
- DEFAULT_VALUE_CONSUMER_USERNAME);
- super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD,
- DEFAULT_VALUE_CONSUMER_PASSWORD);
super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC,
DEFAULT_VALUE_CONSUMER_TOPIC);
defaults();
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StndDefinedFaultConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StndDefinedFaultConfig.java
new file mode 100644
index 000000000..0a1381c2a
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StndDefinedFaultConfig.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2022 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
+
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
+
+public class StndDefinedFaultConfig extends MessageConfig {
+
+ private static final String SECTION_MARKER = "stndDefinedFault";
+ private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.SEC_3GPP_FAULTSUPERVISION_OUTPUT";
+
+ public StndDefinedFaultConfig(ConfigurationFileRepresentation configuration) {
+ super(configuration);
+ sectionMarker = SECTION_MARKER;
+ super.configuration.addSection(SECTION_MARKER);
+ super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC,
+ DEFAULT_VALUE_CONSUMER_TOPIC);
+ defaults();
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StrimziKafkaConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StrimziKafkaConfig.java
new file mode 100644
index 000000000..41ab8a7cb
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StrimziKafkaConfig.java
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2022 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config;
+
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
+
+/*
+ * [strimzi-kafka]
+ * bootstrapServers=abc:9092,def:9092
+ * securityProtocol=PLAINTEXT #OTHER POSSIBLE VALUES - SSL, SASL_PLAINTEXT, SASL_SSL
+ * saslMechanism=PLAIN #Need to understand more
+ * saslJaasConfig=
+ * consumerGroup=
+ * consumerID=
+ */
+public class StrimziKafkaConfig implements Configuration {
+
+ private static final String SECTION_MARKER = "strimzi-kafka";
+
+ private static final String PROPERTY_KEY_ENABLED = "strimziEnabled";
+
+ private static final String PROPERTY_KEY_BOOTSTRAPSERVERS = "bootstrapServers";
+ private static final String DEFAULT_VALUE_BOOTSTRAPSERVERS = "onap-strimzi-kafka-0:9094,onap-strimzi-kafka-1:9094";
+
+ private static final String PROPERTY_KEY_SECURITYPROTOCOL = "securityProtocol";
+ private static final String DEFAULT_VALUE_SECURITYPROTOCOL = "PLAINTEXT";
+
+ private static final String PROPERTY_KEY_SASLMECHANISM = "saslMechanism";
+ private static final String DEFAULT_VALUE_SASLMECHANISM = "PLAIN";
+
+ private static final String PROPERTY_KEY_SASLJAASCONFIG = "saslJaasConfig";
+ private static final String DEFAULT_VALUE_SASLJAASCONFIG = "PLAIN"; // TBD
+
+ private ConfigurationFileRepresentation configuration;
+
+ public StrimziKafkaConfig(ConfigurationFileRepresentation configuration) {
+ this.configuration = configuration;
+ configuration.addSection(SECTION_MARKER);
+ defaults();
+ }
+
+ public Boolean getEnabled() {
+ return configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED);
+ }
+
+ public String getBootstrapServers() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_BOOTSTRAPSERVERS);
+ }
+
+ public String getSecurityProtocol() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_SECURITYPROTOCOL);
+ }
+
+ public String getSaslMechanism() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_SASLMECHANISM);
+ }
+
+ public String getSaslJaasConfig() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_SASLJAASCONFIG);
+ }
+
+ @Override
+ public String getSectionName() {
+ return SECTION_MARKER;
+ }
+
+ @Override
+ public void defaults() {
+ // The default value should be "false" given that SDNR can be run in
+ // environments where Strimzi is not used
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_BOOTSTRAPSERVERS,
+ DEFAULT_VALUE_BOOTSTRAPSERVERS);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_SECURITYPROTOCOL,
+ DEFAULT_VALUE_SECURITYPROTOCOL);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_SASLMECHANISM,
+ DEFAULT_VALUE_SASLMECHANISM);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_SASLJAASCONFIG,
+ DEFAULT_VALUE_SASLJAASCONFIG);
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java
index 584982a5b..d9d487257 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java
@@ -1,5 +1,10 @@
/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
* Copyright (C) 2021 Samsung Electronics
+ * Copyright (C) 2022 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -9,6 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
+ * ============LICENSE_END==========================================================================
*/
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
@@ -31,11 +37,11 @@ public abstract class MessageClient extends BaseHTTPClient {
protected final Map<String, String> headerMap;
private String notificationUri;
- protected enum SendMethod {
+ public /* protected */ enum SendMethod {
PUT, POST
}
- protected enum MessageType {
+ public /* protected */ enum MessageType {
xml, json
}
@@ -95,7 +101,7 @@ public abstract class MessageClient extends BaseHTTPClient {
try {
response = sendRequest(notificationUri, method.toString(), message, headerMap);
} catch (IOException e) {
- LOG.warn("Problem sending fault message: {}", e.getMessage());
+ LOG.warn("Problem sending message: {}", e.getMessage());
return false;
}
LOG.debug("Finished with response code {}", response.code);
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageConfig.java
deleted file mode 100644
index 8a6f6442e..000000000
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageConfig.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * ============LICENSE_START========================================================================
- * ONAP : ccsdk feature sdnr wt mountpoint-registrar
- * =================================================================================================
- * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
- * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
- * =================================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END==========================================================================
- */
-
-
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
-
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
-
-public abstract class MessageConfig implements Configuration {
- protected String sectionMarker;
-
- public static final String PROPERTY_KEY_CONSUMER_TRANSPORTTYPE = "TransportType";
- private static final String DEFAULT_VALUE_CONSUMER_TRANSPORTTYPE = "HTTPNOAUTH";
-
- public static final String PROPERTY_KEY_CONSUMER_PROTOCOL = "Protocol";
- private static final String DEFAULT_VALUE_CONSUMER_PROTOCOL = "http";
-
- public static final String PROPERTY_KEY_CONSUMER_USERNAME = "username";
- public static final String PROPERTY_KEY_CONSUMER_PASSWORD = "password";
-
- public static final String PROPERTY_KEY_CONSUMER_HOST_PORT = "host";
- private static final String DEFAULT_VALUE_CONSUMER_HOST_PORT = "onap-dmaap:3904";
-
- public static final String PROPERTY_KEY_CONSUMER_TOPIC = "topic";
-
- public static final String PROPERTY_KEY_CONSUMER_CONTENTTYPE = "contenttype";
- private static final String DEFAULT_VALUE_CONSUMER_CONTENTTYPE = "application/json";
-
- public static final String PROPERTY_KEY_CONSUMER_GROUP = "group";
- private static final String DEFAULT_VALUE_CONSUMER_GROUP = "myG";
-
- public static final String PROPERTY_KEY_CONSUMER_ID = "id";
- private static final String DEFAULT_VALUE_CONSUMER_ID = "C1";
-
- public static final String PROPERTY_KEY_CONSUMER_TIMEOUT = "timeout";
- private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "20000";
-
- public static final String PROPERTY_KEY_CONSUMER_LIMIT = "limit";
- private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "10000";
-
- public static final String PROPERTY_KEY_CONSUMER_FETCHPAUSE = "fetchPause";
- private static final String DEFAULT_VALUE_CONSUMER_FETCHPAUSE = "5000";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT = "jersey.config.client.readTimeout";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT = "25000";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT = "jersey.config.client.connectTimeout";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT = "25000";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER = "jersey.config.client.proxy.username";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER = "${HTTP_PROXY_USERNAME}";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD = "jersey.config.client.proxy.password";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD = "${HTTP_PROXY_PASSWORD}";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI = "jersey.config.client.proxy.uri";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_URI = "${HTTP_PROXY_URI}";
-
- protected ConfigurationFileRepresentation configuration;
-
- public MessageConfig(ConfigurationFileRepresentation configuration) {
- this.configuration = configuration;
- }
-
- @Override
- public String getSectionName() {
- return sectionMarker;
- }
-
- @Override
- public void defaults() {
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
- DEFAULT_VALUE_CONSUMER_TRANSPORTTYPE);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_PROTOCOL,
- DEFAULT_VALUE_CONSUMER_PROTOCOL);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_HOST_PORT,
- DEFAULT_VALUE_CONSUMER_HOST_PORT);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CONTENTTYPE,
- DEFAULT_VALUE_CONSUMER_CONTENTTYPE);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_GROUP,
- DEFAULT_VALUE_CONSUMER_GROUP);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_ID, DEFAULT_VALUE_CONSUMER_ID);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_TIMEOUT,
- DEFAULT_VALUE_CONSUMER_TIMEOUT);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_LIMIT,
- DEFAULT_VALUE_CONSUMER_LIMIT);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_FETCHPAUSE,
- DEFAULT_VALUE_CONSUMER_FETCHPAUSE);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD);
- configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_URI);
- }
-
-
-
- public String getHostPort() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_HOST_PORT);
- }
-
- public String getTransportType() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE);
- }
-
- public String getProtocol() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_PROTOCOL);
- }
-
- public String getUsername() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_USERNAME);
- }
-
- public String getPassword() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_PASSWORD);
- }
-
- public String getTopic() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TOPIC);
- }
-
- public String getConsumerGroup() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_GROUP);
- }
-
- public String getConsumerId() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_ID);
- }
-
- public String getTimeout() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TIMEOUT);
- }
-
- public String getLimit() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_LIMIT);
- }
-
- public String getFetchPause() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_FETCHPAUSE);
- }
-
- public String getContenttype() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CONTENTTYPE);
- }
-
- public String getClientReadTimeout() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT);
- }
-
- public String getClientConnectTimeout() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT);
- }
-
- public String getHTTPProxyURI() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI);
- }
-
- public String getHTTPProxyUsername() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER);
- }
-
- public String getHTTPProxyPassword() {
- return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD);
- }
-}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
index 136d2a12b..32d68ee62 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
@@ -24,6 +24,13 @@ import java.util.List;
import java.util.Map;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,12 +40,13 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
private static final String APPLICATION_NAME = "mountpoint-registrar";
private static final String CONFIGURATIONFILE = "etc/mountpoint-registrar.properties";
- private Thread dmaapVESMsgConsumerMain = null;
+ private Thread sKafkaVESMsgConsumerMain = null;
private GeneralConfig generalConfig;
- private boolean dmaapEnabled = false;
+ private boolean strimziEnabled = false;
private Map<String, MessageConfig> configMap = new HashMap<>();
- private DMaaPVESMsgConsumerMain dmaapConsumerMain = null;
+ private StrimziKafkaVESMsgConsumerMain sKafkaConsumerMain = null;
+ private StrimziKafkaConfig strimziKafkaConfig;
// Blueprint 1
public MountpointRegistrarImpl() {
@@ -53,22 +61,25 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
configFileRepresentation.registerConfigChangedListener(this);
generalConfig = new GeneralConfig(configFileRepresentation);
+ strimziKafkaConfig = new StrimziKafkaConfig(configFileRepresentation);
PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation);
FaultConfig faultConfig = new FaultConfig(configFileRepresentation);
ProvisioningConfig provisioningConfig = new ProvisioningConfig(configFileRepresentation);
+ StndDefinedFaultConfig stndFaultConfig = new StndDefinedFaultConfig(configFileRepresentation);
configMap.put("pnfRegistration", pnfRegConfig);
configMap.put("fault", faultConfig);
configMap.put("provisioning", provisioningConfig);
-
- dmaapEnabled = generalConfig.getEnabled();
- if (dmaapEnabled) { // start dmaap consumer thread only if dmaapEnabled=true
- LOG.info("DMaaP seems to be enabled, starting consumer(s)");
- dmaapConsumerMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig);
- dmaapVESMsgConsumerMain = new Thread(dmaapConsumerMain);
- dmaapVESMsgConsumerMain.start();
+ configMap.put("stndDefinedFault", stndFaultConfig);
+
+ strimziEnabled = strimziKafkaConfig.getEnabled();
+ if (strimziEnabled) { // start Kafka consumer thread only if strimziEnabled=true
+ LOG.info("Strimzi Kafka seems to be enabled, starting consumer(s)");
+ sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
+ sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
+ sKafkaVESMsgConsumerMain.start();
} else {
- LOG.info("DMaaP seems to be disabled, not starting any consumer(s)");
+ LOG.info("Strimzi Kafka seems to be disabled, not starting any consumer(s)");
}
}
@@ -83,26 +94,26 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
@Override
public void onConfigChanged() {
- if (generalConfig == null) { // Included as NullPointerException observed once in docker logs
- LOG.warn("onConfigChange cannot be handled. Unexpected Null");
- return;
- }
- LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled());
- boolean dmaapEnabledNewVal = generalConfig.getEnabled();
- if (!dmaapEnabled && dmaapEnabledNewVal) { // Dmaap disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
- LOG.info("DMaaP is enabled, starting consumer(s)");
- dmaapConsumerMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig);
- dmaapVESMsgConsumerMain = new Thread(dmaapConsumerMain);
- dmaapVESMsgConsumerMain.start();
- } else if (dmaapEnabled && !dmaapEnabledNewVal) { // Dmaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
- LOG.info("DMaaP is disabled, stopping consumer(s)");
- List<DMaaPVESMsgConsumer> consumers = dmaapConsumerMain.getConsumers();
- for (DMaaPVESMsgConsumer consumer : consumers) {
+ if (generalConfig == null) { // Included as NullPointerException observed once in docker logs
+ LOG.warn("onConfigChange cannot be handled. Unexpected Null");
+ return;
+ }
+ LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled());
+ boolean strimziEnabledNewVal = strimziKafkaConfig.getEnabled();
+ if (!strimziEnabled && strimziEnabledNewVal) { // Strimzi kafka disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
+ LOG.info("Strimzi Kafka is enabled, starting consumer(s)");
+ sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
+ sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
+ sKafkaVESMsgConsumerMain.start();
+ } else if (strimziEnabled && !strimziEnabledNewVal) { // Strimzi kafka enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
+ LOG.info("Strimzi Kafka is disabled, stopping consumer(s)");
+ List<StrimziKafkaVESMsgConsumer> consumers = sKafkaConsumerMain.getConsumers();
+ for (StrimziKafkaVESMsgConsumer consumer : consumers) {
// stop all consumers
consumer.stopConsumer();
}
}
- dmaapEnabled = dmaapEnabledNewVal;
+ strimziEnabled = strimziEnabledNewVal;
}
@Override
@@ -125,6 +136,4 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis
}
}
}
-
-
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumer.java
index 2874c906f..2872384f1 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumer.java
@@ -20,9 +20,9 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
import java.util.Properties;
-public abstract interface DMaaPVESMsgConsumer extends Runnable {
+public abstract interface StrimziKafkaVESMsgConsumer extends Runnable {
- public abstract void init(Properties baseProperties);
+ public abstract void init(Properties strimziKafkaProperties, Properties properties);
public abstract void processMsg(String msg) throws Exception;//Implement something like InvalidMessageException;
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
index 34b8d4031..249eb612e 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
@@ -19,38 +19,38 @@
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
-import java.util.Properties;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.MRConsumer;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
+import java.util.List;
+import java.util.Properties;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DMaaPVESMsgValidator {
+public abstract class StrimziKafkaVESMsgConsumerImpl
+ implements StrimziKafkaVESMsgConsumer, StrimziKafkaVESMsgValidator {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerImpl.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaVESMsgConsumerImpl.class);
private static final String DEFAULT_SDNRUSER = "admin";
private static final String DEFAULT_SDNRPASSWD = "admin";
private final String name = this.getClass().getSimpleName();
- private Properties properties = null;
- private MRConsumer consumer = null;
+ private VESMsgKafkaConsumer consumer = null;
private boolean running = false;
private boolean ready = false;
private int fetchPause = 5000; // Default pause between fetch - 5 seconds
- private int timeout = 15000; // Default timeout - 15 seconds
protected final GeneralConfig generalConfig;
- protected DMaaPVESMsgConsumerImpl(GeneralConfig generalConfig) {
+ protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) {
this.generalConfig = generalConfig;
}
/*
- * Thread to fetch messages from the DMaaP topic. Waits for the messages to arrive on the topic until a certain timeout and returns.
- * If no data arrives on the topic, sleeps for a certain time period before checking again
+ * Thread to fetch messages from the Kafka topic. Waits for the messages to
+ * arrive on the topic until a certain timeout and returns. If no data arrives
+ * on the topic, sleeps for a certain time period before checking again
*/
@Override
public void run() {
@@ -60,35 +60,28 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM
while (running) {
try {
boolean noData = true;
- MRConsumerResponse consumerResponse = null;
- consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
- for (String msg : consumerResponse.getActualMessages()) {
+ List<String> consumerResponse = null;
+ consumerResponse = consumer.poll();
+ for (String msg : consumerResponse) {
noData = false;
- LOG.debug("{} received ActualMessage from DMaaP VES Message topic {}", name,msg);
- if(isMessageValid(msg)) {
+ LOG.debug("{} received ActualMessage from Kafka VES Message topic {}", name, msg);
+ if (isMessageValid(msg)) {
processMsg(msg);
}
}
if (noData) {
- LOG.debug("{} received ResponseCode: {}", name, consumerResponse.getResponseCode());
- LOG.debug("{} received ResponseMessage: {}", name, consumerResponse.getResponseMessage());
- if ((consumerResponse.getResponseCode() == null)
- && (consumerResponse.getResponseMessage().contains("SocketTimeoutException"))) {
- LOG.warn("Client timeout while waiting for response from Server {}",
- consumerResponse.getResponseMessage());
- }
pauseThread();
}
- } catch (InterruptedException e) {
- LOG.warn("Caught exception reading from DMaaP VES Message Topic", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Caught exception reading from Kafka Message Topic", e);
Thread.currentThread().interrupt();
} catch (JsonProcessingException jsonProcessingException) {
LOG.warn("Failed to convert message to JsonNode: {}", jsonProcessingException.getMessage());
} catch (InvalidMessageException invalidMessageException) {
LOG.warn("Message is invalid because of: {}", invalidMessageException.getMessage());
} catch (Exception e) {
- LOG.error("Caught exception reading from DMaaP VES Message Topic", e);
+ LOG.error("Caught exception reading from Kafka Message Topic", e);
running = false;
}
}
@@ -105,50 +98,19 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM
}
/*
- * Create a consumer by specifying properties containing information such as topic name, timeout, URL etc
+ * Create a Kafka consumer by specifying properties containing information such as
+ * topic name, timeout, URL etc
*/
@Override
- public void init(Properties properties) {
+ public void init(Properties strimziKafkaProperties, Properties consumerProperties) {
try {
-
- String timeoutStr = properties.getProperty("timeout");
- LOG.debug("timeoutStr: {}", timeoutStr);
-
- if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
- timeout = parseTimeOutValue(timeoutStr);
- }
-
- String fetchPauseStr = properties.getProperty("fetchPause");
- LOG.debug("fetchPause(Str): {}",fetchPauseStr);
- if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
- fetchPause = parseFetchPause(fetchPauseStr);
- }
- LOG.debug("fetchPause: {} ",fetchPause);
-
- this.consumer = MRClientFactory.createConsumer(properties);
+ this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties);
+ this.consumer.subscribe(consumerProperties.getProperty("topic"));
ready = true;
} catch (Exception e) {
- LOG.error("Error initializing DMaaP VES Message consumer from file {} {}",properties, e);
- }
- }
-
- private int parseTimeOutValue(String timeoutStr) {
- try {
- return Integer.parseInt(timeoutStr);
- } catch (NumberFormatException e) {
- LOG.error("Non-numeric value specified for timeout ({})",timeoutStr);
- }
- return timeout;
- }
-
- private int parseFetchPause(String fetchPauseStr) {
- try {
- return Integer.parseInt(fetchPauseStr);
- } catch (NumberFormatException e) {
- LOG.error("Non-numeric value specified for fetchPause ({})",fetchPauseStr);
+ LOG.error("Error initializing Kafka Message consumer from file {} {}", consumerProperties, e);
}
- return fetchPause;
}
private void pauseThread() throws InterruptedException {
@@ -170,16 +132,15 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM
return running;
}
- public String getProperty(String name) {
- return properties.getProperty(name, "");
- }
-
+ /*
+ * public String getProperty(String name) { return properties.getProperty(name,
+ * ""); }
+ */
@Override
public void stopConsumer() {
running = false;
}
-
public String getBaseUrl() {
return generalConfig.getBaseUrl();
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
index 3626f534a..03573d85b 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
@@ -23,34 +23,58 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.StrimziKafkaCMVESMsgConsumer;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.StrimziKafkaFaultVESMsgConsumer;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg.StrimziKafkaPNFRegVESMsgConsumer;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.stnddefined.StrimziKafkaStndDefinedFaultVESMsgConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DMaaPVESMsgConsumerMain implements Runnable {
+public class StrimziKafkaVESMsgConsumerMain implements Runnable {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaVESMsgConsumerMain.class);
+ Properties strimziKafkaProperties = new Properties();
private static final String _PNFREG_CLASS =
"org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
private static final String _FAULT_CLASS =
"org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
private static final String _CM_CLASS =
"org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer";
+ private static final String _STNDDEFINED_FAULT_CLASS =
+ "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPStndDefinedFaultVESMsgConsumer";
private static final String _PNFREG_DOMAIN = "pnfRegistration";
private static final String _FAULT_DOMAIN = "fault";
private static final String _CM_DOMAIN = "provisioning";
+ private static final String _STNDDEFINED_FAULT_DOMAIN = "stndDefinedFault";
boolean threadsRunning = false;
- List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
+ List<StrimziKafkaVESMsgConsumer> consumers = new LinkedList<>();
private PNFRegistrationConfig pnfRegistrationConfig;
private FaultConfig faultConfig;
private GeneralConfig generalConfig;
private ProvisioningConfig provisioningConfig;
+ private StndDefinedFaultConfig stndDefinedFaultConfig;
+ private StrimziKafkaConfig strimziKafkaConfig;
- public DMaaPVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
+ public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
this.generalConfig = generalConfig;
configMap.forEach(this::initialize);
}
+ public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig,
+ StrimziKafkaConfig strimziKafkaConfig) {
+ this.generalConfig = generalConfig;
+ this.strimziKafkaConfig = strimziKafkaConfig;
+ configMap.forEach(this::initialize);
+ }
+
public void initialize(String domain, MessageConfig domainConfig) {
LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
String consumerClass;
@@ -60,12 +84,6 @@ public class DMaaPVESMsgConsumerMain implements Runnable {
consumerClass = _PNFREG_CLASS;
LOG.debug("Consumer class = {}", consumerClass);
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
- pnfRegistrationConfig.getTransportType());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
- pnfRegistrationConfig.getHostPort());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
- pnfRegistrationConfig.getContenttype());
consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
pnfRegistrationConfig.getConsumerGroup());
consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
@@ -76,61 +94,26 @@ public class DMaaPVESMsgConsumerMain implements Runnable {
consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
pnfRegistrationConfig.getFetchPause());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL,
- pnfRegistrationConfig.getProtocol());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME,
- pnfRegistrationConfig.getUsername());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD,
- pnfRegistrationConfig.getPassword());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- pnfRegistrationConfig.getClientReadTimeout());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- pnfRegistrationConfig.getClientConnectTimeout());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- pnfRegistrationConfig.getHTTPProxyURI());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- pnfRegistrationConfig.getHTTPProxyUsername());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- pnfRegistrationConfig.getHTTPProxyPassword());
-
- threadsRunning = createConsumer(_PNFREG_DOMAIN, consumerProperties);
+
+ threadsRunning =
+ createConsumer(_PNFREG_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
} else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) {
this.faultConfig = (FaultConfig) domainConfig;
consumerClass = _FAULT_CLASS;
LOG.debug("Consumer class = {}", consumerClass);
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- faultConfig.getClientReadTimeout());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- faultConfig.getClientConnectTimeout());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- faultConfig.getHTTPProxyURI());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- faultConfig.getHTTPProxyUsername());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- faultConfig.getHTTPProxyPassword());
- threadsRunning = createConsumer(_FAULT_DOMAIN, consumerProperties);
+
+ threadsRunning =
+ createConsumer(_FAULT_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
} else if (domain.equalsIgnoreCase(_CM_DOMAIN)) {
this.provisioningConfig = (ProvisioningConfig) domainConfig;
consumerClass = _CM_CLASS;
LOG.debug("Consumer class = {}", consumerClass);
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
- provisioningConfig.getTransportType());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
- provisioningConfig.getHostPort());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
- provisioningConfig.getContenttype());
consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP,
provisioningConfig.getConsumerGroup());
consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId());
@@ -139,26 +122,43 @@ public class DMaaPVESMsgConsumerMain implements Runnable {
consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit());
consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
provisioningConfig.getFetchPause());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, provisioningConfig.getProtocol());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_USERNAME, provisioningConfig.getUsername());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PASSWORD, provisioningConfig.getPassword());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
- provisioningConfig.getClientReadTimeout());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
- provisioningConfig.getClientConnectTimeout());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI,
- provisioningConfig.getHTTPProxyURI());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER,
- provisioningConfig.getHTTPProxyUsername());
- consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD,
- provisioningConfig.getHTTPProxyPassword());
- threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties);
+
+ threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
+ } else if (domain.equalsIgnoreCase(_STNDDEFINED_FAULT_DOMAIN)) {
+ this.stndDefinedFaultConfig = (StndDefinedFaultConfig) domainConfig;
+ consumerClass = _STNDDEFINED_FAULT_CLASS;
+ LOG.debug("Consumer class = {}", consumerClass);
+ consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_GROUP,
+ stndDefinedFaultConfig.getConsumerGroup());
+ consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_ID,
+ stndDefinedFaultConfig.getConsumerId());
+ consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TOPIC,
+ stndDefinedFaultConfig.getTopic());
+ consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
+ stndDefinedFaultConfig.getTimeout());
+ consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_LIMIT,
+ stndDefinedFaultConfig.getLimit());
+ consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
+ stndDefinedFaultConfig.getFetchPause());
+
+ threadsRunning = createConsumer(_STNDDEFINED_FAULT_DOMAIN, consumerProperties,
+ getStrimziKafkaProps(strimziKafkaConfig));
+ }
+ }
+
+ private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) {
+ if (strimziKafkaProperties.size() == 0) {
+ strimziKafkaProperties.put("bootstrapServers", strimziKafkaConfig.getBootstrapServers());
+ strimziKafkaProperties.put("securityProtocol", strimziKafkaConfig.getSecurityProtocol());
+ strimziKafkaProperties.put("saslMechanism", strimziKafkaConfig.getSaslMechanism());
+ strimziKafkaProperties.put("saslJaasConfig", strimziKafkaConfig.getSaslJaasConfig());
}
+ return strimziKafkaProperties;
}
- private boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
+ private boolean updateThreadState(List<StrimziKafkaVESMsgConsumer> consumers) {
boolean threadsRunning = false;
- for (DMaaPVESMsgConsumer consumer : consumers) {
+ for (StrimziKafkaVESMsgConsumer consumer : consumers) {
if (consumer.isRunning()) {
threadsRunning = true;
}
@@ -166,31 +166,33 @@ public class DMaaPVESMsgConsumerMain implements Runnable {
return threadsRunning;
}
- public boolean createConsumer(String consumerType, Properties properties) {
- DMaaPVESMsgConsumerImpl consumer = null;
+ public boolean createConsumer(String consumerType, Properties consumerProperties, Properties strimziKafkaProps) {
+ StrimziKafkaVESMsgConsumerImpl consumer = null;
if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
- consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig);
else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
- consumer = new DMaaPFaultVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig);
else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
- consumer = new DMaaPCMVESMsgConsumer(generalConfig);
+ consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig);
+ else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN))
+ consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig);
- handleConsumer(consumer, properties, consumers);
+ handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers);
return !consumers.isEmpty();
}
- private boolean handleConsumer(DMaaPVESMsgConsumer consumer, Properties properties,
- List<DMaaPVESMsgConsumer> consumers) {
+ private boolean handleConsumer(StrimziKafkaVESMsgConsumer consumer, Properties consumerProperties,
+ Properties strimziKafkaProps, List<StrimziKafkaVESMsgConsumer> consumers) {
if (consumer != null) {
- consumer.init(properties);
+ consumer.init(strimziKafkaProps, consumerProperties);
if (consumer.isReady()) {
Thread consumerThread = new Thread(consumer);
consumerThread.start();
consumers.add(consumer);
- LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), properties);
+ LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), consumerProperties);
return true;
} else {
LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
@@ -218,7 +220,7 @@ public class DMaaPVESMsgConsumerMain implements Runnable {
LOG.info("No listener threads running - exiting");
}
- public List<DMaaPVESMsgConsumer> getConsumers() {
+ public List<StrimziKafkaVESMsgConsumer> getConsumers() {
return consumers;
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgValidator.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgValidator.java
index 0532334ef..b1bd2fca5 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgValidator.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgValidator.java
@@ -18,7 +18,7 @@
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
-public interface DMaaPVESMsgValidator {
+public interface StrimziKafkaVESMsgValidator {
boolean isMessageValid(String message);
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java
new file mode 100644
index 000000000..b8dee44b0
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java
@@ -0,0 +1,81 @@
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that provides a KafkaConsumer to communicate with a kafka cluster
+ */
+public class VESMsgKafkaConsumer {
+
+ private static final Logger log = LoggerFactory.getLogger(VESMsgKafkaConsumer.class);
+ final KafkaConsumer<String, String> consumer;
+ private final int pollTimeout;
+ private String topicName;
+ private static final String DESERIALIZER_CLASS = "org.apache.kafka.common.serialization.StringDeserializer";
+
+ /**
+ *
+ * @param consumerProperties
+ * @param configuration The config provided to the client
+ */
+ public VESMsgKafkaConsumer(Properties strimziKafkaProperties, Properties consumerProperties) {
+ Properties props = new Properties();
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers"));
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("securityProtocol"));
+ props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("saslMechanism"));
+ props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("saslJaasConfig"));
+ props.put(ConsumerConfig.GROUP_ID_CONFIG,
+ consumerProperties.getProperty("topic") + "-" + consumerProperties.getProperty("consumerGroup"));
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG,
+ consumerProperties.getProperty("topic") + "-" + consumerProperties.getProperty("consumerID"));
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DESERIALIZER_CLASS);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DESERIALIZER_CLASS);
+ consumer = new KafkaConsumer<>(props);
+ pollTimeout = Integer.parseInt(consumerProperties.getProperty("timeout"));
+ }
+
+ /**
+ *
+ * @param topic The kafka topic to subscribe to
+ */
+ public void subscribe(String topic) {
+ try {
+ consumer.subscribe(Collections.singleton(topic));
+ this.topicName = topic;
+ } catch (InvalidGroupIdException e) {
+ log.error("Invalid Group {}", e.getMessage());
+ }
+ }
+
+ /**
+ *
+ * @return The list of records returned from the poll
+ */
+ public List<String> poll() {
+ List<String> msgs = new ArrayList<>();
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(pollTimeout));
+ for (ConsumerRecord<String, String> rec : records) {
+ msgs.add(rec.value());
+ }
+ return msgs;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMBasicHeaderFieldsNotification.java
index 98f02ec7a..9a01d53d1 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMBasicHeaderFieldsNotification.java
@@ -15,7 +15,7 @@
* the License.
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm;
public class CMBasicHeaderFieldsNotification {
private String cmNodeId;
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotification.java
index 014ff648d..8de9a21d0 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotification.java
@@ -15,7 +15,7 @@
* the License.
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.CmOperation;
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotificationClient.java
index 115f0f0c0..bdd83cce9 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotificationClient.java
@@ -16,7 +16,7 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm;
import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.SendMethod.POST;
@@ -24,6 +24,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.MessageType;
+
public class CMNotificationClient extends MessageClient {
private static final String CM_NOTIFICATION_URI = "rests/operations/devicemanager:push-cm-notification";
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java
index 8412e3730..c32d16273 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java
@@ -16,23 +16,27 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Iterator;
import java.util.Map;
+
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
+public class StrimziKafkaCMVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPCMVESMsgConsumer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaCMVESMsgConsumer.class);
- public DMaaPCMVESMsgConsumer(GeneralConfig generalConfig) {
+ public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig) {
super(generalConfig);
- LOG.info("DMaaPCMVESMsgConsumer started successfully");
+ LOG.info("StrimziKafkaCMVESMsgConsumer started successfully");
}
@Override
@@ -45,16 +49,16 @@ public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
String notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue();
if (notificationType.equalsIgnoreCase("notifyMOIChanges")) {
- LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", cmNodeId);
+ LOG.info("Read CM message from Kafka topic that is moiChanges type with id {}", cmNodeId);
processMoiChanges(rootNode);
} else if (notificationType.equalsIgnoreCase("notifyMOICreation")) {
- LOG.info("Read CM message from DMaaP topic that is moiCreation type with id {}", cmNodeId);
+ LOG.info("Read CM message from Kafka topic that is moiCreation type with id {}", cmNodeId);
sendCMNotification(preparePayloadMapFromMoi(rootNode, "/event/stndDefinedFields/data/attributeList"));
} else if (notificationType.equalsIgnoreCase("notifyMOIDeletion")) {
- LOG.info("Read CM message from DMaaP topic that is moiDeletion type with id {}", cmNodeId);
+ LOG.info("Read CM message from Kafka topic that is moiDeletion type with id {}", cmNodeId);
sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList"));
} else if (notificationType.equalsIgnoreCase("notifyMOIAttributeValueChanges")) {
- LOG.info("Read CM message from DMaaP topic that is moiAttributeValueChanges type with id {}", cmNodeId);
+ LOG.info("Read CM message from Kafka topic that is moiAttributeValueChanges type with id {}", cmNodeId);
sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges"));
} else {
LOG.warn("Message is invalid, sending aborted, wrong CM notification type {}", notificationType);
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultNotificationClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/FaultNotificationClient.java
index ce2538628..60a241831 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultNotificationClient.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/FaultNotificationClient.java
@@ -17,12 +17,14 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient;
+
import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.MessageType.*;
import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.SendMethod.*;
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java
index ee9d0220a..dc65732b4 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPFaultVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java
@@ -17,7 +17,7 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -25,15 +25,18 @@ import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Map;
+
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DMaaPFaultVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
+public class StrimziKafkaFaultVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPFaultVESMsgConsumer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaFaultVESMsgConsumer.class);
- public DMaaPFaultVESMsgConsumer(GeneralConfig generalConfig) {
+ public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig) {
super(generalConfig);
}
@@ -48,32 +51,32 @@ public class DMaaPFaultVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
int faultSequence;
String reportingEntityName;
ObjectMapper oMapper = new ObjectMapper();
- JsonNode dmaapMessageRootNode;
+ JsonNode sKafkaMessageRootNode;
LOG.info("Fault VES Message is - {}", msg);
try {
- dmaapMessageRootNode = oMapper.readTree(msg);
- reportingEntityName = dmaapMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
+ sKafkaMessageRootNode = oMapper.readTree(msg);
+ reportingEntityName = sKafkaMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
if (reportingEntityName.equals("ONAP SDN-R")) {
LOG.info(
- "VES PNF Registration message generated by SDNR, hence no need to process any further; Ignoring the received message");
+ "VES Fault message generated by SDNR, hence no need to process any further; Ignoring the received message");
return;
}
- vesDomain = dmaapMessageRootNode.at("/event/commonEventHeader/domain").textValue();
+ vesDomain = sKafkaMessageRootNode.at("/event/commonEventHeader/domain").textValue();
if (!vesDomain.equalsIgnoreCase("fault")) {
- LOG.warn("Received {} domain VES Message in DMaaP Fault topic, ignoring it", vesDomain);
+ LOG.warn("Received {} domain VES Message in Kafka Fault topic, ignoring it", vesDomain);
return;
}
- faultNodeId = dmaapMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
+ faultNodeId = sKafkaMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
faultOccurrenceTime = Instant
.ofEpochMilli(
- dmaapMessageRootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
+ sKafkaMessageRootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
.atZone(ZoneId.of("Z")).toString();
- faultObjectId = dmaapMessageRootNode.at("/event/faultFields/alarmInterfaceA").textValue();
- faultReason = dmaapMessageRootNode.at("/event/faultFields/specificProblem").textValue();
- faultSeverity = dmaapMessageRootNode.at("/event/faultFields/eventSeverity").textValue();
- faultSequence = dmaapMessageRootNode.at("/event/commonEventHeader/sequence").intValue();
+ faultObjectId = sKafkaMessageRootNode.at("/event/faultFields/alarmInterfaceA").textValue();
+ faultReason = sKafkaMessageRootNode.at("/event/faultFields/specificProblem").textValue();
+ faultSeverity = sKafkaMessageRootNode.at("/event/faultFields/eventSeverity").textValue();
+ faultSequence = sKafkaMessageRootNode.at("/event/commonEventHeader/sequence").intValue();
if (faultSeverity.equalsIgnoreCase("critical")) {
faultSeverity = SeverityType.Critical.toString();
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFMountPointClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/PNFMountPointClient.java
index fd31a3fd6..70402a878 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFMountPointClient.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/PNFMountPointClient.java
@@ -17,7 +17,7 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg;
import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.MessageType.xml;
import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.SendMethod.PUT;
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import org.eclipse.jdt.annotation.NonNull;
import org.onap.ccsdk.features.sdnr.wt.common.database.requests.BaseRequest;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient;
public class PNFMountPointClient extends MessageClient {
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPPNFRegVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java
index 51d6d1950..147202fb8 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPPNFRegVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java
@@ -17,32 +17,34 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import org.eclipse.jdt.annotation.Nullable;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DMaaPPNFRegVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
+public class StrimziKafkaPNFRegVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPPNFRegVESMsgConsumer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaPNFRegVESMsgConsumer.class);
private static final String DEFAULT_PROTOCOL = "SSH";
private static final String DEFAULT_PORT = "17830";
private static final String DEFAULT_USERNAME = "netconf";
private static final String DEFAULT_PASSWORD = "netconf";
- public DMaaPPNFRegVESMsgConsumer(GeneralConfig generalConfig) {
+ public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig) {
super(generalConfig);
}
@Override
public void processMsg(String msg) {
- LOG.debug("Message from DMaaP topic is - {} ", msg);
+ LOG.debug("Message from Kafka topic is - {} ", msg);
String pnfId;
String pnfIPAddress;
@Nullable
@@ -57,33 +59,33 @@ public class DMaaPPNFRegVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
String pnfPasswd = null;
String reportingEntityName;
ObjectMapper oMapper = new ObjectMapper();
- JsonNode dmaapMessageRootNode;
+ JsonNode sKafkaMessageRootNode;
try {
- dmaapMessageRootNode = oMapper.readTree(msg);
- reportingEntityName = dmaapMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
+ sKafkaMessageRootNode = oMapper.readTree(msg);
+ reportingEntityName = sKafkaMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
if (reportingEntityName.equals("ONAP SDN-R")) {
LOG.info(
"VES PNF Registration message generated by SDNR, hence no need to process any further; Ignoring the received message");
return;
}
- pnfId = dmaapMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
- pnfIPAddress = getPNFIPAddress(dmaapMessageRootNode);
+ pnfId = sKafkaMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
+ pnfIPAddress = getPNFIPAddress(sKafkaMessageRootNode);
pnfCommProtocol =
- dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/protocol").textValue();
- pnfCommPort = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/oamPort").textValue();
+ sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/protocol").textValue();
+ pnfCommPort = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/oamPort").textValue();
if (pnfCommProtocol != null) {
if (pnfCommProtocol.equalsIgnoreCase("TLS")) {
// Read username and keyId
pnfKeyId =
- dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/keyId").textValue();
- pnfUsername = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username")
+ sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/keyId").textValue();
+ pnfUsername = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username")
.textValue();
} else if (pnfCommProtocol.equalsIgnoreCase("SSH")) {
// Read username and password
- pnfUsername = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username")
+ pnfUsername = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username")
.textValue();
- pnfPasswd = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/password")
+ pnfPasswd = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/password")
.textValue();
} else {
// log warning - Unknown protocol
@@ -139,12 +141,12 @@ public class DMaaPPNFRegVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
pnfCommPort == null || pnfUsername == null;
}
- private String getPNFIPAddress(JsonNode dmaapMessageRootNode) {
- String ipAddress = dmaapMessageRootNode.at("/event/pnfRegistrationFields/oamV6IpAddress").textValue();
+ private String getPNFIPAddress(JsonNode sKafkaMessageRootNode) {
+ String ipAddress = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/oamV6IpAddress").textValue();
if (ipAddress != null && ipAddress != "")
return ipAddress;
- ipAddress = dmaapMessageRootNode.at("/event/pnfRegistrationFields/oamV4IpAddress").textValue();
+ ipAddress = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/oamV4IpAddress").textValue();
if (ipAddress != null && ipAddress != "")
return ipAddress;
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java
new file mode 100644
index 000000000..648809722
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java
@@ -0,0 +1,141 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt mountpoint-registrar
+ * =================================================================================================
+ * Copyright (C) 2022 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.stnddefined;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Map;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.FaultNotificationClient;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StrimziKafkaStndDefinedFaultVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaStndDefinedFaultVESMsgConsumer.class);
+ Map<String, String> payloadMapMessage = null;
+ String faultNodeId;
+ String notificationType;
+
+ public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig) {
+ super(generalConfig);
+ LOG.info("StrimziKafkaStndDefinedFaultVESMsgConsumer started successfully");
+ }
+
+ /*
+ * Supports processing of notifyNewAlarm and notifyClearedAlarm messages ONLY
+ */
+ @Override
+ public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException {
+ LOG.debug("Processing StndDefined Fault message {}", msg);
+ JsonNode rootNode = convertMessageToJsonNode(msg);
+ try {
+
+ faultNodeId = rootNode.at("/event/commonEventHeader/sourceName").textValue();
+ notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue();
+
+ if (notificationType.equalsIgnoreCase("notifyNewAlarm")) {
+ LOG.info("Read stndDefined Fault message of type - {} with id {} from Kafka topic", notificationType,
+ faultNodeId);
+ processNewAlarm(rootNode);
+ } else if (notificationType.equalsIgnoreCase("notifyClearedAlarm")) {
+ LOG.info("Read stdnDefined Fault message of type - {} with id {} from Kafka topic", notificationType,
+ faultNodeId);
+ processClearedAlarm(rootNode);
+ } else {
+ LOG.warn(
+ "Read stdnDefined Fault message of type - {} with id {} from Kafka topic. No suitable implementation for processing this message",
+ notificationType, faultNodeId);
+ throw new InvalidMessageException();
+ }
+ // Send Fault Notification
+ String baseUrl = getBaseUrl();
+ String sdnrUser = getSDNRUser();
+ String sdnrPasswd = getSDNRPasswd();
+
+ FaultNotificationClient faultClient = new FaultNotificationClient(baseUrl);
+ LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
+ faultClient.setAuthorization(sdnrUser, sdnrPasswd);
+ String message = faultClient.prepareMessageFromPayloadMap(payloadMapMessage);
+ faultClient.sendNotification(message);
+ } catch (NullPointerException e) {
+ LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing");
+ throw new InvalidMessageException("Missing field");
+ }
+
+ }
+
+ private void processClearedAlarm(JsonNode rootNode) {
+ String faultOccurrenceTime =
+ Instant.ofEpochMilli(rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
+ .atZone(ZoneId.of("Z")).toString();
+ int faultSequence = rootNode.at("/event/commonEventHeader/sequence").intValue();
+ String faultObjectId = rootNode.at("/event/stndDefinedFields/data/alarmId").textValue();
+ String faultReason = rootNode.at("/event/stndDefinedFields/data/specificProblem").textValue();
+ String faultSeverity =
+ getSDNRSeverityType(rootNode.at("/event/stndDefinedFields/data/perceivedSeverity").textValue());
+
+ payloadMapMessage = FaultNotificationClient.createFaultNotificationPayloadMap(faultNodeId,
+ Integer.toString(faultSequence), faultOccurrenceTime, faultObjectId, faultReason, faultSeverity);
+
+ }
+
+ private void processNewAlarm(JsonNode rootNode) {
+ String faultOccurrenceTime =
+ Instant.ofEpochMilli(rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
+ .atZone(ZoneId.of("Z")).toString();
+ int faultSequence = rootNode.at("/event/commonEventHeader/sequence").intValue();
+ String faultObjectId = rootNode.at("/event/stndDefinedFields/data/alarmId").textValue();
+ String faultReason = rootNode.at("/event/stndDefinedFields/data/specificProblem").textValue();
+ String faultSeverity =
+ getSDNRSeverityType(rootNode.at("/event/stndDefinedFields/data/perceivedSeverity").textValue());
+
+ payloadMapMessage = FaultNotificationClient.createFaultNotificationPayloadMap(faultNodeId,
+ Integer.toString(faultSequence), faultOccurrenceTime, faultObjectId, faultReason, faultSeverity);
+
+ }
+
+ /*
+ * 3GPP Definition PerceivedSeverity: type: string enum: - INDETERMINATE -
+ * CRITICAL - MAJOR - MINOR - WARNING - CLEARED
+ *
+ */
+ private String getSDNRSeverityType(String faultSeverity) {
+ if (faultSeverity.equalsIgnoreCase("critical")) {
+ faultSeverity = SeverityType.Critical.toString();
+ } else if (faultSeverity.equalsIgnoreCase("major")) {
+ faultSeverity = SeverityType.Major.toString();
+ } else if (faultSeverity.equalsIgnoreCase("minor")) {
+ faultSeverity = SeverityType.Minor.toString();
+ } else if (faultSeverity.equalsIgnoreCase("warning") || faultSeverity.equalsIgnoreCase("indeterminate")) {
+ faultSeverity = SeverityType.Warning.toString();
+ } else if (faultSeverity.equalsIgnoreCase("cleared")) {
+ faultSeverity = SeverityType.NonAlarmed.toString();
+ } else {
+ faultSeverity = SeverityType.NonAlarmed.toString();
+ }
+ return faultSeverity;
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMBasicHeaderFieldsNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestCMBasicHeaderFieldsNotification.java
index 5446da048..5ad73e25c 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMBasicHeaderFieldsNotification.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestCMBasicHeaderFieldsNotification.java
@@ -15,12 +15,12 @@
* the License.
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.client;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.CMBasicHeaderFieldsNotification;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.CMBasicHeaderFieldsNotification;
public class TestCMBasicHeaderFieldsNotification {
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationBuilder.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestCMNotificationBuilder.java
index 3b74df321..9badc02cb 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationBuilder.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestCMNotificationBuilder.java
@@ -16,13 +16,13 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.client;
import static org.junit.Assert.*;
import org.junit.Test;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.CMBasicHeaderFieldsNotification;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.CMNotification;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.CMBasicHeaderFieldsNotification;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.CMNotification;
public class TestCMNotificationBuilder {
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationClient.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestCMNotificationClient.java
index 12ccd4c62..8cea25f1f 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestCMNotificationClient.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestCMNotificationClient.java
@@ -16,7 +16,7 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.client;
import static org.junit.Assert.assertTrue;
@@ -24,9 +24,9 @@ import java.util.Map;
import javax.annotation.Nonnull;
import org.junit.Test;
import org.onap.ccsdk.features.sdnr.wt.common.http.BaseHTTPResponse;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.CMBasicHeaderFieldsNotification;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.CMNotification;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.CMNotificationClient;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.CMBasicHeaderFieldsNotification;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.CMNotification;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.CMNotificationClient;
public class TestCMNotificationClient extends CMNotificationClient {
public static String baseUrl = "http://localhost:8181";
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestFaultNotificationClient.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestFaultNotificationClient.java
index fa289aa4b..a33970269 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestFaultNotificationClient.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestFaultNotificationClient.java
@@ -17,7 +17,7 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.client;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -25,7 +25,7 @@ import java.util.Map;
import javax.annotation.Nonnull;
import org.junit.Test;
import org.onap.ccsdk.features.sdnr.wt.common.http.BaseHTTPResponse;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.FaultNotificationClient;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.FaultNotificationClient;
public class TestFaultNotificationClient extends FaultNotificationClient {
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestPNFMountPointClient.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestPNFMountPointClient.java
index 0858a7faa..c43800044 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestPNFMountPointClient.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/client/TestPNFMountPointClient.java
@@ -17,7 +17,7 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.client;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -25,7 +25,7 @@ import java.util.Map;
import javax.annotation.Nonnull;
import org.junit.Test;
import org.onap.ccsdk.features.sdnr.wt.common.http.BaseHTTPResponse;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.PNFMountPointClient;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg.PNFMountPointClient;
public class TestPNFMountPointClient extends PNFMountPointClient {
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/GeneralConfigForTest.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/GeneralConfigForTest.java
index 3413e9766..23c009a5b 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/GeneralConfigForTest.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/GeneralConfigForTest.java
@@ -16,20 +16,19 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
public class GeneralConfigForTest implements AutoCloseable {
// @formatter:off
private static final String TESTCONFIG_CONTENT = "[general]\n"
- + "dmaapEnabled=false\n"
+ "baseUrl=http://localhost:8181\n"
+ "sdnrUser=admin\n"
+ "sdnrPasswd=admin\n"
@@ -39,7 +38,7 @@ public class GeneralConfigForTest implements AutoCloseable {
private GeneralConfig cfg ;
private String filename;
- GeneralConfigForTest(String filename) throws IOException {
+ public GeneralConfigForTest(String filename) throws IOException {
Files.asCharSink(new File(filename), StandardCharsets.UTF_8).write(TESTCONFIG_CONTENT);
ConfigurationFileRepresentation globalCfg = new ConfigurationFileRepresentation(filename);
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/PNFRegistrationConfigTest.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/PNFRegistrationConfigTest.java
index d3f4a13d6..b76e71372 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/PNFRegistrationConfigTest.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/PNFRegistrationConfigTest.java
@@ -16,7 +16,7 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config;
import static org.junit.Assert.assertEquals;
import com.google.common.io.Files;
@@ -26,29 +26,18 @@ import java.nio.charset.StandardCharsets;
import org.junit.After;
import org.junit.Test;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.PNFRegistrationConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
public class PNFRegistrationConfigTest {
// @formatter:off
private static final String TESTCONFIG_CONTENT = "[pnfRegistration]\n"
- + "TransportType=HTTPNOAUTH\n"
- + "Protocol=http\n"
- + "username=username\n"
- + "password=password\n"
- + "host=onap-dmap:3904\n"
+ "topic=unauthenticated.VES_PNFREG_OUTPUT\n"
- + "contenttype=application/json\n"
- + "group=myG\n"
- + "id=C1\n"
+ + "consumerGroup=myG\n"
+ + "consumerID=C1\n"
+ "timeout=20000\n"
+ "limit=10000\n"
+ "fetchPause=5000\n"
- + "jersey.config.client.readTimeout=25000\n"
- + "jersey.config.client.connectTimeout=25000\n"
- + "jersey.config.client.proxy.uri=http://http-proxy\n"
- + "jersey.config.client.proxy.username=proxy-user\n"
- + "jersey.config.client.proxy.password=proxy-password\n"
+ "";
// @formatter:on
private ConfigurationFileRepresentation cfg;
@@ -61,24 +50,12 @@ public class PNFRegistrationConfigTest {
cfg = new ConfigurationFileRepresentation(configFile);
PNFRegistrationConfig pnfCfg = new PNFRegistrationConfig(cfg);
assertEquals("pnfRegistration", pnfCfg.getSectionName());
- assertEquals("HTTPNOAUTH", pnfCfg.getTransportType());
- assertEquals("onap-dmap:3904", pnfCfg.getHostPort());
assertEquals("unauthenticated.VES_PNFREG_OUTPUT", pnfCfg.getTopic());
- assertEquals("application/json", pnfCfg.getContenttype());
assertEquals("myG", pnfCfg.getConsumerGroup());
assertEquals("C1", pnfCfg.getConsumerId());
assertEquals("20000", pnfCfg.getTimeout());
assertEquals("10000", pnfCfg.getLimit());
assertEquals("5000", pnfCfg.getFetchPause());
- assertEquals("http", pnfCfg.getProtocol());
- assertEquals("username", pnfCfg.getUsername());
- assertEquals("password", pnfCfg.getPassword());
- assertEquals("25000", pnfCfg.getClientReadTimeout());
- assertEquals("25000", pnfCfg.getClientConnectTimeout());
- assertEquals("http://http-proxy", pnfCfg.getHTTPProxyURI());
- assertEquals("proxy-user", pnfCfg.getHTTPProxyUsername());
- assertEquals("proxy-password", pnfCfg.getHTTPProxyPassword());
-
} catch (IOException e) {
e.printStackTrace();
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestFaultConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestFaultConfig.java
index 422d24935..8741370aa 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestFaultConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestFaultConfig.java
@@ -16,9 +16,9 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
@@ -26,35 +26,25 @@ import java.nio.charset.StandardCharsets;
import org.junit.After;
import org.junit.Test;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.FaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
public class TestFaultConfig {
// @formatter:off
private static final String TESTCONFIG_CONTENT = "[fault]\n"
- + "TransportType=HTTPNOAUTH\n"
- + "Protocol=http\n"
- + "username=username\n"
- + "password=password\n"
- + "host=onap-dmap:3904\n"
+ "topic=unauthenticated.SEC_FAULT_OUTPUT\n"
+ "contenttype=application/json\n"
- + "group=myG\n"
- + "id=C1\n"
+ + "consumerGroup=myG\n"
+ + "consumerID=C1\n"
+ "timeout=20000\n"
+ "limit=10000\n"
+ "fetchPause=5000\n"
- + "jersey.config.client.readTimeout=25000\n"
- + "jersey.config.client.connectTimeout=25000\n"
- + "jersey.config.client.proxy.uri=http://http-proxy\n"
- + "jersey.config.client.proxy.username=proxy-user\n"
- + "jersey.config.client.proxy.password=proxy-password\n"
- + "\n"
+ "";
- // @formatter:on
+ // @formatter:on
private ConfigurationFileRepresentation cfg;
private static final String CONFIGURATIONFILE = "test2.properties";
+
@Test
public void test() {
try {
@@ -62,24 +52,12 @@ public class TestFaultConfig {
cfg = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
FaultConfig faultCfg = new FaultConfig(cfg);
assertEquals("fault", faultCfg.getSectionName());
- assertEquals("HTTPNOAUTH", faultCfg.getTransportType());
- assertEquals("onap-dmap:3904", faultCfg.getHostPort());
assertEquals("unauthenticated.SEC_FAULT_OUTPUT", faultCfg.getTopic());
- assertEquals("application/json", faultCfg.getContenttype());
assertEquals("myG", faultCfg.getConsumerGroup());
assertEquals("C1", faultCfg.getConsumerId());
assertEquals("20000", faultCfg.getTimeout());
assertEquals("10000", faultCfg.getLimit());
assertEquals("5000", faultCfg.getFetchPause());
- assertEquals("http", faultCfg.getProtocol());
- assertEquals("username", faultCfg.getUsername());
- assertEquals("password", faultCfg.getPassword());
- assertEquals("25000", faultCfg.getClientReadTimeout());
- assertEquals("25000", faultCfg.getClientConnectTimeout());
- assertEquals("http://http-proxy", faultCfg.getHTTPProxyURI());
- assertEquals("proxy-user", faultCfg.getHTTPProxyUsername());
- assertEquals("proxy-password", faultCfg.getHTTPProxyPassword());
-
} catch (IOException e) {
e.printStackTrace();
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestGeneralConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestGeneralConfig.java
index f73f3a5bf..9324f794d 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestGeneralConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestGeneralConfig.java
@@ -16,14 +16,14 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
public class TestGeneralConfig {
@@ -40,7 +40,6 @@ public class TestGeneralConfig {
public void test() throws IOException {
GeneralConfig cfg = config.getCfg();
- assertEquals(false, cfg.getEnabled());
assertEquals("http://localhost:8181", cfg.getBaseUrl());
assertEquals("admin", cfg.getSDNRUser());
assertEquals("admin", cfg.getSDNRPasswd());
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestProvisioningConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestProvisioningConfig.java
index 42c204aec..0a1ab241b 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestProvisioningConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestProvisioningConfig.java
@@ -16,43 +16,28 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-
import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.ProvisioningConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
public class TestProvisioningConfig {
private static final String TESTCONFIG_CONTENT = "[provisioning]\n"
- + "TransportType=HTTPNOAUTH\n"
- + "Protocol=http\n"
- + "username=username\n"
- + "password=password\n"
- + "host=onap-dmap:3904\n"
+
+ "topic=unauthenticated.SEC_3GPP_PROVISIONING_OUTPUT\n"
- + "contenttype=application/json\n"
- + "group=myG\n"
- + "id=C1\n"
+ + "consumerGroup=myG\n"
+ + "consumerID=C1\n"
+ "timeout=20000\n"
+ "limit=10000\n"
+ "fetchPause=5000\n"
- + "jersey.config.client.readTimeout=25000\n"
- + "jersey.config.client.connectTimeout=25000\n"
- + "jersey.config.client.proxy.uri=http://http-proxy\n"
- + "jersey.config.client.proxy.username=proxy-user\n"
- + "jersey.config.client.proxy.password=proxy-password\n"
+ "";
private static final String TEMP_DIR = System.getProperty("java.io.tmpdir");
@@ -65,23 +50,12 @@ public class TestProvisioningConfig {
ConfigurationFileRepresentation cfg = new ConfigurationFileRepresentation(configFile);
ProvisioningConfig provisioningConfig = new ProvisioningConfig(cfg);
assertEquals("provisioning", provisioningConfig.getSectionName());
- assertEquals("HTTPNOAUTH", provisioningConfig.getTransportType());
- assertEquals("onap-dmap:3904", provisioningConfig.getHostPort());
assertEquals("unauthenticated.SEC_3GPP_PROVISIONING_OUTPUT", provisioningConfig.getTopic());
- assertEquals("application/json", provisioningConfig.getContenttype());
assertEquals("myG", provisioningConfig.getConsumerGroup());
assertEquals("C1", provisioningConfig.getConsumerId());
assertEquals("20000", provisioningConfig.getTimeout());
assertEquals("10000", provisioningConfig.getLimit());
assertEquals("5000", provisioningConfig.getFetchPause());
- assertEquals("http", provisioningConfig.getProtocol());
- assertEquals("username", provisioningConfig.getUsername());
- assertEquals("password", provisioningConfig.getPassword());
- assertEquals("25000", provisioningConfig.getClientReadTimeout());
- assertEquals("25000", provisioningConfig.getClientConnectTimeout());
- assertEquals("http://http-proxy", provisioningConfig.getHTTPProxyURI());
- assertEquals("proxy-user", provisioningConfig.getHTTPProxyUsername());
- assertEquals("proxy-password", provisioningConfig.getHTTPProxyPassword());
}
@After
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestStrimziKafkaConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestStrimziKafkaConfig.java
new file mode 100644
index 000000000..b3546ea06
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestStrimziKafkaConfig.java
@@ -0,0 +1,73 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.junit.After;
+import org.junit.Test;
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
+
+public class TestStrimziKafkaConfig {
+
+ // @formatter:off
+ private static final String TESTCONFIG_CONTENT = "[strimzi-kafka]\n"
+ + "strimziEnabled=false\n"
+ + "bootstrapServers=onap-strimzi-kafka-0:9094,onap-strimzi-kafka-1:9094\n"
+ + "securityProtocol=PLAINTEXT\n"
+ + "saslMechanism=PLAIN\n"
+ + "saslJaasConfig=PLAIN\n"
+ + "";
+ // @formatter:on
+
+ private ConfigurationFileRepresentation cfg;
+ private static final String CONFIGURATIONFILE = "test2.properties";
+
+ @Test
+ public void test() {
+ try {
+ Files.asCharSink(new File(CONFIGURATIONFILE), StandardCharsets.UTF_8).write(TESTCONFIG_CONTENT);
+ cfg = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
+ StrimziKafkaConfig sKafkaCfg = new StrimziKafkaConfig(cfg);
+ assertEquals("strimzi-kafka", sKafkaCfg.getSectionName());
+ assertEquals("onap-strimzi-kafka-0:9094,onap-strimzi-kafka-1:9094", sKafkaCfg.getBootstrapServers());
+ assertEquals("PLAINTEXT", sKafkaCfg.getSecurityProtocol());
+ assertEquals(false, sKafkaCfg.getEnabled());
+ assertEquals("PLAIN", sKafkaCfg.getSaslJaasConfig());
+ assertEquals("PLAIN", sKafkaCfg.getSaslMechanism());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @After
+ public void cleanUp() {
+ File file = new File(CONFIGURATIONFILE);
+ if (file.exists()) {
+ System.out.println("File exists, Deleting it");
+ file.delete();
+ }
+
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaCMVESMsgConsumer.java
index 2c4fb647b..c3beb29f7 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPCMVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaCMVESMsgConsumer.java
@@ -16,46 +16,46 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.consumer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
import com.fasterxml.jackson.core.JsonProcessingException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
-
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import java.util.Iterator;
import java.util.Map;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config.GeneralConfigForTest;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.StrimziKafkaCMVESMsgConsumer;
-public class TestDMaaPCMVESMsgConsumer {
+public class TestStrimziKafkaCMVESMsgConsumer {
private static final String CONFIGURATION_FILE = "cm_test.properties";
- private DMaaPCMVESMsgConsumer dMaaPCMVESMsgConsumer;
+ private StrimziKafkaCMVESMsgConsumer sKafkaCMVESMsgConsumer;
private GeneralConfigForTest generalConfigForTest;
@Before
public void setUp() throws Exception {
generalConfigForTest = new GeneralConfigForTest(CONFIGURATION_FILE);
- dMaaPCMVESMsgConsumer = new DMaaPCMVESMsgConsumer(generalConfigForTest.getCfg());
+ sKafkaCMVESMsgConsumer = new StrimziKafkaCMVESMsgConsumer(generalConfigForTest.getCfg());
}
@Test
public void processValidMsg() throws URISyntaxException, IOException {
- File cmFileValid = new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_valid.json").toURI());
+ File cmFileValid = new File(TestStrimziKafkaCMVESMsgConsumer.class.getResource("/msgs/cm_valid.json").toURI());
String cmEvent = readFileToString(cmFileValid);
try {
- dMaaPCMVESMsgConsumer.processMsg(cmEvent);
+ sKafkaCMVESMsgConsumer.processMsg(cmEvent);
} catch (Exception e) {
fail("Test fail with message: " + e.getMessage());
}
@@ -63,29 +63,29 @@ public class TestDMaaPCMVESMsgConsumer {
@Test(expected = InvalidMessageException.class)
public void processMsgThatMissesField() throws URISyntaxException, IOException, InvalidMessageException {
- File cmFileInvalid = new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_invalid.json").toURI());
+ File cmFileInvalid = new File(TestStrimziKafkaCMVESMsgConsumer.class.getResource("/msgs/cm_invalid.json").toURI());
String cmEvent = readFileToString(cmFileInvalid);
- dMaaPCMVESMsgConsumer.processMsg(cmEvent);
+ sKafkaCMVESMsgConsumer.processMsg(cmEvent);
}
@Test(expected = InvalidMessageException.class)
public void processMsgThatHasInvalidNotificationType()
throws URISyntaxException, IOException, InvalidMessageException {
- File cmFileInvalid = new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_invalid_type.json").toURI());
+ File cmFileInvalid = new File(TestStrimziKafkaCMVESMsgConsumer.class.getResource("/msgs/cm_invalid_type.json").toURI());
String cmEvent = readFileToString(cmFileInvalid);
- dMaaPCMVESMsgConsumer.processMsg(cmEvent);
+ sKafkaCMVESMsgConsumer.processMsg(cmEvent);
}
@Test(expected = JsonProcessingException.class)
public void processMsgThatIsNotValidJson() throws URISyntaxException, IOException, InvalidMessageException {
- File cmFileInvalid = new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/not_a_json.json").toURI());
+ File cmFileInvalid = new File(TestStrimziKafkaCMVESMsgConsumer.class.getResource("/msgs/not_a_json.json").toURI());
String cmEvent = readFileToString(cmFileInvalid);
- dMaaPCMVESMsgConsumer.processMsg(cmEvent);
+ sKafkaCMVESMsgConsumer.processMsg(cmEvent);
}
@Test
public void processMsgWithOneElementMoiChangesArray() throws URISyntaxException, IOException {
- File cmFileValid = new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_valid.json").toURI());
+ File cmFileValid = new File(TestStrimziKafkaCMVESMsgConsumer.class.getResource("/msgs/cm_valid.json").toURI());
String cmEvent = readFileToString(cmFileValid);
try {
JsonNode rootNode = convertMessageToJsonNode(cmEvent);
@@ -93,7 +93,7 @@ public class TestDMaaPCMVESMsgConsumer {
.at("/event/stndDefinedFields/data/moiChanges")
.elements();
Map<String, String> payloadMap =
- dMaaPCMVESMsgConsumer.preparePayloadMapFromMoiChangesArray(rootNode, nodes);
+ sKafkaCMVESMsgConsumer.preparePayloadMapFromMoiChangesArray(rootNode, nodes);
assertEquals("samsung-O-DU-1122", payloadMap.get("@node-id@"));
assertEquals("0", payloadMap.get("@counter@"));
@@ -114,7 +114,7 @@ public class TestDMaaPCMVESMsgConsumer {
@Test
public void processMsgWithTwoElementMoiChangesArray() throws URISyntaxException, IOException {
File cmFileValid =
- new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_valid_two_element_moi_changes_array.json")
+ new File(TestStrimziKafkaCMVESMsgConsumer.class.getResource("/msgs/cm_valid_two_element_moi_changes_array.json")
.toURI());
String cmEvent = readFileToString(cmFileValid);
try {
@@ -123,7 +123,7 @@ public class TestDMaaPCMVESMsgConsumer {
.at("/event/stndDefinedFields/data/moiChanges")
.elements();
Map<String, String> payloadMap =
- dMaaPCMVESMsgConsumer.preparePayloadMapFromMoiChangesArray(rootNode, nodes);
+ sKafkaCMVESMsgConsumer.preparePayloadMapFromMoiChangesArray(rootNode, nodes);
assertEquals("samsung-O-DU-1122", payloadMap.get("@node-id@"));
assertEquals("0", payloadMap.get("@counter@"));
@@ -138,7 +138,7 @@ public class TestDMaaPCMVESMsgConsumer {
Map<String, String> payloadMap2 = null;
while (nodes.hasNext()) {
- payloadMap2 = dMaaPCMVESMsgConsumer.preparePayloadMapFromMoiChangesArray(rootNode, nodes);
+ payloadMap2 = sKafkaCMVESMsgConsumer.preparePayloadMapFromMoiChangesArray(rootNode, nodes);
}
assertEquals("samsung-O-DU-1122", payloadMap2.get("@node-id@"));
assertEquals("124", payloadMap2.get("@notification-id@"));
@@ -154,11 +154,11 @@ public class TestDMaaPCMVESMsgConsumer {
@Test
public void processMsgNotifyMoiCreationType() throws URISyntaxException, IOException {
- File cmFileValid = new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_moi_creation.json").toURI());
+ File cmFileValid = new File(TestStrimziKafkaCMVESMsgConsumer.class.getResource("/msgs/cm_moi_creation.json").toURI());
String cmEvent = readFileToString(cmFileValid);
try {
JsonNode rootNode = convertMessageToJsonNode(cmEvent);
- Map<String, String> payloadMap = dMaaPCMVESMsgConsumer.preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList");
+ Map<String, String> payloadMap = sKafkaCMVESMsgConsumer.preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList");
assertEquals("samsung-O-DU-1122", payloadMap.get("@node-id@"));
assertEquals("0", payloadMap.get("@counter@"));
assertEquals("2019-01-09T12:30:07.722Z", payloadMap.get("@timestamp@"));
@@ -177,11 +177,11 @@ public class TestDMaaPCMVESMsgConsumer {
@Test
public void processMsgNotifyMoiDeletionType() throws URISyntaxException, IOException {
- File cmFileValid = new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_moi_deletion.json").toURI());
+ File cmFileValid = new File(TestStrimziKafkaCMVESMsgConsumer.class.getResource("/msgs/cm_moi_deletion.json").toURI());
String cmEvent = readFileToString(cmFileValid);
try {
JsonNode rootNode = convertMessageToJsonNode(cmEvent);
- Map<String, String> payloadMap = dMaaPCMVESMsgConsumer.preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList");
+ Map<String, String> payloadMap = sKafkaCMVESMsgConsumer.preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList");
assertEquals("samsung-O-DU-1122", payloadMap.get("@node-id@"));
assertEquals("0", payloadMap.get("@counter@"));
assertEquals("2019-01-09T12:30:07.722Z", payloadMap.get("@timestamp@"));
@@ -201,11 +201,11 @@ public class TestDMaaPCMVESMsgConsumer {
@Test
public void processMsgNotifyMoiAttributeValueChangesType() throws URISyntaxException, IOException {
File cmFileValid =
- new File(TestDMaaPCMVESMsgConsumer.class.getResource("/msgs/cm_moi_attribute_value_changes.json").toURI());
+ new File(TestStrimziKafkaCMVESMsgConsumer.class.getResource("/msgs/cm_moi_attribute_value_changes.json").toURI());
String cmEvent = readFileToString(cmFileValid);
try {
JsonNode rootNode = convertMessageToJsonNode(cmEvent);
- Map<String, String> payloadMap = dMaaPCMVESMsgConsumer.preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges");
+ Map<String, String> payloadMap = sKafkaCMVESMsgConsumer.preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges");
assertEquals("samsung-O-DU-1122", payloadMap.get("@node-id@"));
assertEquals("0", payloadMap.get("@counter@"));
assertEquals("2019-01-09T12:30:07.722Z", payloadMap.get("@timestamp@"));
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaFaultVESMsgConsumer.java
index cf25e1e7b..912b73584 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPFaultVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaFaultVESMsgConsumer.java
@@ -16,15 +16,16 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.consumer;
import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config.GeneralConfigForTest;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.StrimziKafkaFaultVESMsgConsumer;
-public class TestDMaaPFaultVESMsgConsumer {
+public class TestStrimziKafkaFaultVESMsgConsumer {
private static final String DEFAULT_SDNRUSER = "admin";
private static final String DEFAULT_SDNRPASSWD = "admin";
@@ -121,6 +122,7 @@ public class TestDMaaPFaultVESMsgConsumer {
public void before() throws IOException {
cfgTest = new GeneralConfigForTest(CONFIGURATIONFILE);
}
+
@After
public void after() {
cfgTest.close();
@@ -129,7 +131,7 @@ public class TestDMaaPFaultVESMsgConsumer {
@Test
public void test() throws IOException {
- DMaaPFaultVESMsgConsumer faultMsgConsumer = new DMaaPFaultVESMsgConsumer(cfgTest.getCfg());
+ StrimziKafkaFaultVESMsgConsumer faultMsgConsumer = new StrimziKafkaFaultVESMsgConsumer(cfgTest.getCfg());
try {
faultMsgConsumer.processMsg(faultVESMsg.replace("@eventSeverity@", "CRITICAL"));
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPPNFRegVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaPNFRegVESMsgConsumer.java
index 2c07caa1c..20b6c4ae7 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPPNFRegVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaPNFRegVESMsgConsumer.java
@@ -16,15 +16,16 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.consumer;
import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config.GeneralConfigForTest;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg.StrimziKafkaPNFRegVESMsgConsumer;
-public class TestDMaaPPNFRegVESMsgConsumer {
+public class TestStrimziKafkaPNFRegVESMsgConsumer {
private static final String DEFAULT_SDNRUSER = "admin";
private static final String DEFAULT_SDNRPASSWD = "admin";
@@ -247,7 +248,7 @@ public class TestDMaaPPNFRegVESMsgConsumer {
@Test
public void processMsgTest() {
- DMaaPPNFRegVESMsgConsumer pnfRegMsgConsumer = new DMaaPPNFRegVESMsgConsumer(cfgTest.getCfg());
+ StrimziKafkaPNFRegVESMsgConsumer pnfRegMsgConsumer = new StrimziKafkaPNFRegVESMsgConsumer(cfgTest.getCfg());
try {
pnfRegMsgConsumer.processMsg(pnfRegMsg);
pnfRegMsgConsumer.processMsg(pnfRegMsg_SSH);
@@ -261,7 +262,7 @@ public class TestDMaaPPNFRegVESMsgConsumer {
@Test
public void Test1() {
- DMaaPPNFRegVESMsgConsumer pnfConsumer = new DMaaPPNFRegVESMsgConsumer(cfgTest.getCfg());
+ StrimziKafkaPNFRegVESMsgConsumer pnfConsumer = new StrimziKafkaPNFRegVESMsgConsumer(cfgTest.getCfg());
System.out.println(pnfConsumer.getBaseUrl());
System.out.println(pnfConsumer.getSDNRUser());
System.out.println(pnfConsumer.getSDNRPasswd());
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaStndDefinedVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaStndDefinedVESMsgConsumer.java
new file mode 100644
index 000000000..0185bf687
--- /dev/null
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaStndDefinedVESMsgConsumer.java
@@ -0,0 +1,239 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2023 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.consumer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config.GeneralConfigForTest;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.stnddefined.StrimziKafkaStndDefinedFaultVESMsgConsumer;
+
+public class TestStrimziKafkaStndDefinedVESMsgConsumer {
+
+ private static final String CONFIGURATIONFILE = "test2.properties";
+
+ // @formatter:off
+ private static final String stndDefinedVESMsg_NotifyNewAlarm =
+ "{\n"
+ + " \"event\": {\n"
+ + " \"commonEventHeader\": {\n"
+ + " \"startEpochMicrosec\": 1669022429000000,\n"
+ + " \"eventId\": \"stndDefined000000001\",\n"
+ + " \"timeZoneOffset\": \"+00:00\",\n"
+ + " \"internalHeaderFields\": {\n"
+ + " \"collectorTimeStamp\": \"Mon, 11 21 2022 09:20:30 UTC\"\n"
+ + " },\n"
+ + " \"eventType\": \"5GCell-NodeH_Alarms\",\n"
+ + " \"priority\": \"Low\",\n"
+ + " \"version\": \"4.1\",\n"
+ + " \"nfVendorName\": \"NodeH\",\n"
+ + " \"reportingEntityName\": \"NodeH-5GCell-1234\",\n"
+ + " \"sequence\": 5,\n"
+ + " \"domain\": \"stndDefined\",\n"
+ + " \"lastEpochMicrosec\": 1669022429000000,\n"
+ + " \"eventName\": \"StndDefined_5GCell-NodeH_Alarms_MyAlarm\",\n"
+ + " \"vesEventListenerVersion\": \"7.2.1\",\n"
+ + " \"sourceName\": \"NodeH-5GCell-1234\",\n"
+ + " \"stndDefinedNamespace\": \"3GPP-FaultSupervision\",\n"
+ + " \"nfNamingCode\": \"5GCell\"\n"
+ + " },\n"
+ + " \"stndDefinedFields\": {\n"
+ + " \"stndDefinedFieldsVersion\": \"1.0\",\n"
+ + " \"data\": {\n"
+ + " \"additionalInformation\": {\n"
+ + " \"equipType\": \"5GCell\",\n"
+ + " \"vendor\": \"NodeH\",\n"
+ + " \"eventTime\": \"2022-11-21T09:20:29Z\",\n"
+ + " \"model\": \"SF1234\"\n"
+ + " },\n"
+ + " \"backedUpStatus\": false,\n"
+ + " \"rootCauseIndicator\": false,\n"
+ + " \"notificationType\": \"notifyNewAlarm\",\n"
+ + " \"systemDN\": \"DC=com.Node-H,CN=5GCell\",\n"
+ + " \"alarmType\": \"COMMUNICATIONS_ALARM\",\n"
+ + " \"probableCause\": \"My cause\",\n"
+ + " \"perceivedSeverity\": \"@eventSeverity@\",\n"
+ + " \"eventTime\": \"2022-11-21T09:20:29Z\",\n"
+ + " \"alarmId\": \"MyAlarm\",\n"
+ + " \"proposedRepairActions\": \"Repair me\",\n"
+ + " \"notificationId\": 0,\n"
+ + " \"href\": \"http://10.0.33.23/3GPPManagement/FaultSupervisionMnS/17.1.0\"\n"
+ + " },\n"
+ + " \"schemaReference\": \"https://forge.3gpp.org/rep/sa5/MnS/-/blob/Rel-18/OpenAPI/TS28532_FaultMnS.yaml#components/schemas/NotifyNewAlarm\"\n"
+ + " }\n"
+ + " }\n"
+ + "}";
+ // @formatter:on
+ // @formatter:off
+ private static final String stndDefinedVESMsg_NotifyClearedAlarm = "{\n"
+ + " \"event\": {\n"
+ + " \"commonEventHeader\": {\n"
+ + " \"startEpochMicrosec\": 1669022429000000,\n"
+ + " \"eventId\": \"stndDefined000000001\",\n"
+ + " \"timeZoneOffset\": \"+00:00\",\n"
+ + " \"internalHeaderFields\": {\n"
+ + " \"collectorTimeStamp\": \"Mon, 11 21 2022 09:20:30 UTC\"\n"
+ + " },\n"
+ + " \"eventType\": \"5GCell-NodeH_Alarms\",\n"
+ + " \"priority\": \"Low\",\n"
+ + " \"version\": \"4.1\",\n"
+ + " \"nfVendorName\": \"NodeH\",\n"
+ + " \"reportingEntityName\": \"NodeH-5GCell-1234\",\n"
+ + " \"sequence\": 5,\n"
+ + " \"domain\": \"stndDefined\",\n"
+ + " \"lastEpochMicrosec\": 1669022429000000,\n"
+ + " \"eventName\": \"StndDefined_5GCell-NodeH_Alarms_MyAlarm\",\n"
+ + " \"vesEventListenerVersion\": \"7.2.1\",\n"
+ + " \"sourceName\": \"NodeH-5GCell-1234\",\n"
+ + " \"stndDefinedNamespace\": \"3GPP-FaultSupervision\",\n"
+ + " \"nfNamingCode\": \"5GCell\"\n"
+ + " },\n"
+ + " \"stndDefinedFields\": {\n"
+ + " \"stndDefinedFieldsVersion\": \"1.0\",\n"
+ + " \"data\": {\n"
+ + " \"additionalInformation\": {\n"
+ + " \"equipType\": \"5GCell\",\n"
+ + " \"vendor\": \"NodeH\",\n"
+ + " \"eventTime\": \"2022-11-21T09:20:29Z\",\n"
+ + " \"model\": \"SF1234\"\n"
+ + " },\n"
+ + " \"backedUpStatus\": false,\n"
+ + " \"rootCauseIndicator\": false,\n"
+ + " \"notificationType\": \"notifyClearedAlarm\",\n"
+ + " \"systemDN\": \"DC=com.Node-H,CN=5GCell\",\n"
+ + " \"alarmType\": \"COMMUNICATIONS_ALARM\",\n"
+ + " \"probableCause\": \"My cause\",\n"
+ + " \"perceivedSeverity\": \"@eventSeverity@\",\n"
+ + " \"eventTime\": \"2022-11-21T09:20:29Z\",\n"
+ + " \"alarmId\": \"MyAlarm\",\n"
+ + " \"proposedRepairActions\": \"Repair me\",\n"
+ + " \"notificationId\": 0,\n"
+ + " \"href\": \"http://10.0.33.23/3GPPManagement/FaultSupervisionMnS/17.1.0\"\n"
+ + " },\n"
+ + " \"schemaReference\": \"https://forge.3gpp.org/rep/sa5/MnS/-/blob/Rel-18/OpenAPI/TS28532_FaultMnS.yaml#components/schemas/NotifyClearedAlarm\"\n"
+ + " }\n"
+ + " }\n"
+ + "}";
+ // @formatter:on
+
+ // @formatter:off
+ private static final String stndDefinedVESMsg_Invalid = "{\n"
+ + " \"event\": {\n"
+ + " \"commonEventHeader\": {\n"
+ + " \"startEpochMicrosec\": 1669022429000000,\n"
+ + " \"eventId\": \"stndDefined000000001\",\n"
+ + " \"timeZoneOffset\": \"+00:00\",\n"
+ + " \"internalHeaderFields\": {\n"
+ + " \"collectorTimeStamp\": \"Mon, 11 21 2022 09:20:30 UTC\"\n"
+ + " },\n"
+ + " \"eventType\": \"5GCell-NodeH_Alarms\",\n"
+ + " \"priority\": \"Low\",\n"
+ + " \"version\": \"4.1\",\n"
+ + " \"nfVendorName\": \"NodeH\",\n"
+ + " \"reportingEntityName\": \"NodeH-5GCell-1234\",\n"
+ + " \"sequence\": 5,\n"
+ + " \"domain\": \"stndDefined\",\n"
+ + " \"lastEpochMicrosec\": 1669022429000000,\n"
+ + " \"eventName\": \"StndDefined_5GCell-NodeH_Alarms_MyAlarm\",\n"
+ + " \"vesEventListenerVersion\": \"7.2.1\",\n"
+ + " \"sourceName\": \"NodeH-5GCell-1234\",\n"
+ + " \"stndDefinedNamespace\": \"3GPP-FaultSupervision\",\n"
+ + " \"nfNamingCode\": \"5GCell\"\n"
+ + " },\n"
+ + " \"stndDefinedFields\": {\n"
+ + " \"stndDefinedFieldsVersion\": \"1.0\",\n"
+ + " \"data\": {\n"
+ + " \"additionalInformation\": {\n"
+ + " \"equipType\": \"5GCell\",\n"
+ + " \"vendor\": \"NodeH\",\n"
+ + " \"eventTime\": \"2022-11-21T09:20:29Z\",\n"
+ + " \"model\": \"SF1234\"\n"
+ + " },\n"
+ + " \"backedUpStatus\": false,\n"
+ + " \"rootCauseIndicator\": false,\n"
+ + " \"notificationType\": \"notifyChangedAlarm\",\n"
+ + " \"systemDN\": \"DC=com.Node-H,CN=5GCell\",\n"
+ + " \"alarmType\": \"COMMUNICATIONS_ALARM\",\n"
+ + " \"probableCause\": \"My cause\",\n"
+ + " \"perceivedSeverity\": \"@eventSeverity@\",\n"
+ + " \"eventTime\": \"2022-11-21T09:20:29Z\",\n"
+ + " \"alarmId\": \"MyAlarm\",\n"
+ + " \"proposedRepairActions\": \"Repair me\",\n"
+ + " \"notificationId\": 0,\n"
+ + " \"href\": \"http://10.0.33.23/3GPPManagement/FaultSupervisionMnS/17.1.0\"\n"
+ + " },\n"
+ + " \"schemaReference\": \"https://forge.3gpp.org/rep/sa5/MnS/-/blob/Rel-18/OpenAPI/TS28532_FaultMnS.yaml#components/schemas/NotifyClearedAlarm\"\n"
+ + " }\n"
+ + " }\n"
+ + "}";
+ // @formatter:on
+ private GeneralConfigForTest cfgTest;
+
+ @Before
+ public void before() throws IOException {
+ cfgTest = new GeneralConfigForTest(CONFIGURATIONFILE);
+ }
+
+ @After
+ public void after() {
+ cfgTest.close();
+ }
+
+
+ @Test
+ public void testNotifyNewAlarm() throws IOException {
+ StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg());
+ try {
+
+ stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "CRITICAL"));
+ stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "Major"));
+ stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "minor"));
+ stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "NonAlarmed"));
+ stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "warning"));
+ stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "Unknown"));
+ //stndDefinedFaultMsgConsumer.processMsg(faultVESMsg_Incomplete);
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("Exception while processing Fault Message - " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testNotifyClearedAlarm() throws IOException {
+ StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg());
+ try {
+
+ stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyClearedAlarm.replace("@eventSeverity@", "cleared"));
+ stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyClearedAlarm.replace("@eventSeverity@", "Indeterminate"));
+ //stndDefinedFaultMsgConsumer.processMsg(faultVESMsg_Incomplete);
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("Exception while processing Fault Message - " + e.getMessage());
+ }
+ }
+
+ @Test(expected = InvalidMessageException.class)
+ public void testInvalidStndDefinedMessage() throws InvalidMessageException, JsonProcessingException {
+ StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg());
+ stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_Invalid.replace("@eventSeverity@", "cleared"));
+ }
+}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaVESMsgConsumerMain.java
index ecfb8d081..d218d0d6f 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/TestDMaaPVESMsgConsumerMain.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaVESMsgConsumerMain.java
@@ -17,7 +17,7 @@
* ============LICENSE_END==========================================================================
*/
-package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test;
+package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.consumer;
import static org.junit.Assert.assertNotNull;
import com.google.common.io.Files;
@@ -29,9 +29,14 @@ import java.util.Map;
import org.junit.After;
import org.junit.Test;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.*;
-public class TestDMaaPVESMsgConsumerMain {
+public class TestStrimziKafkaVESMsgConsumerMain {
private static final String CONFIGURATIONFILE = "test1.properties";
private static final String TESTCONFIG_GENERAL = "[general]\n"
@@ -97,7 +102,7 @@ public class TestDMaaPVESMsgConsumerMain {
+ "";
public GeneralConfig generalConfig;
Map<String, MessageConfig> configMap = new HashMap<>();
- DMaaPVESMsgConsumerMain dmaapMain;
+ StrimziKafkaVESMsgConsumerMain dmaapMain;
public void preTest1() {
@@ -142,14 +147,14 @@ public class TestDMaaPVESMsgConsumerMain {
public void testDMaaPVESMsgConsumerMainMapOfStringConfiguration() {
preTest1();
assertNotNull(configMap);
- dmaapMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig);
+// dmaapMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig);
}
@Test
public void testDMaaPVESMsgConsumerMainMapOfStringConfiguration1() {
preTest2();
assertNotNull(configMap);
- dmaapMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig);
+// dmaapMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig);
}
@After
@@ -159,11 +164,11 @@ public class TestDMaaPVESMsgConsumerMain {
System.out.println("File exists, Deleting it");
file.delete();
}
- List<DMaaPVESMsgConsumer> consumers = dmaapMain.getConsumers();
- for (DMaaPVESMsgConsumer consumer : consumers) {
- // stop all consumers
- consumer.stopConsumer();
- }
+// List<DMaaPVESMsgConsumer> consumers = dmaapMain.getConsumers();
+// for (DMaaPVESMsgConsumer consumer : consumers) {
+// // stop all consumers
+// consumer.stopConsumer();
+// }
}
}