diff options
author | Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> | 2023-03-14 18:02:29 +0530 |
---|---|---|
committer | KAPIL SINGAL <ks220y@att.com> | 2023-03-14 14:07:58 +0000 |
commit | 3323a01bc3633dd723c1c7e9ad9488f89029bd1f (patch) | |
tree | 8c2f8aa84bc11615dbd990b97e9657ab4154b9f4 /sdnr/wt/mountpoint-registrar/provider/src/main/java | |
parent | dc75699b17dfd4af719127e2e6f67d49b06e04e0 (diff) |
Use Strimzi Kafka and Kafka native APIs
dmaapClient library is no longer used as DMaaP-MR is being deprecated
Issue-ID: CCSDK-3784
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Change-Id: I12b9b7c8c57ad983a162e04ad8e76a57978fa9ee
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java')
24 files changed, 680 insertions, 453 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/FaultConfig.java index 7c71f7ee7..42180bd44 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultConfig.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/FaultConfig.java @@ -16,15 +16,13 @@ * the License. * ============LICENSE_END========================================================================== */ -package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config; import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; public class FaultConfig extends MessageConfig { private static final String SECTION_MARKER = "fault"; - private static final String DEFAULT_VALUE_CONSUMER_USERNAME = "${DMAAP_FAULT_TOPIC_USERNAME}"; - private static final String DEFAULT_VALUE_CONSUMER_PASSWORD = "${DMAAP_FAULT_TOPIC_PASSWORD}"; private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.SEC_FAULT_OUTPUT"; @@ -32,10 +30,6 @@ public class FaultConfig extends MessageConfig { super(configuration); sectionMarker = SECTION_MARKER; super.configuration.addSection(SECTION_MARKER); - super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME, - DEFAULT_VALUE_CONSUMER_USERNAME); - super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD, - DEFAULT_VALUE_CONSUMER_PASSWORD); super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC, DEFAULT_VALUE_CONSUMER_TOPIC); defaults(); diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/GeneralConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/GeneralConfig.java index eec4e7a9e..a8f920497 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/GeneralConfig.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/GeneralConfig.java @@ -15,25 +15,18 @@ * the License. * ============LICENSE_END========================================================================== */ -package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config; import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; /** * Configuration of mountpoint-registrar, general section<br> - * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not Generates default - * Configuration properties if none exist or exist partially Generates Consumer properties only for - * TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not - * generated by default. For a list of applicable properties for the different TranportType values, please see - - * https://wiki.onap.org/display/DW/Feature+configuration+requirements */ public class GeneralConfig implements Configuration { private static final String SECTION_MARKER = "general"; - private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled"; - private static final String PROPERTY_KEY_USER = "sdnrUser"; private static final String DEFAULT_VALUE_USER = "${SDNRUSERNAME}"; @@ -52,10 +45,6 @@ public class GeneralConfig implements Configuration { defaults(); } - public Boolean getEnabled() { - return configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED); - } - public String getBaseUrl() { return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_BASEURL); } @@ -75,8 +64,6 @@ public class GeneralConfig implements Configuration { @Override public void defaults() { - // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE); configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_BASEURL, DEFAULT_VALUE_BASEURL); configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_USER, DEFAULT_VALUE_USER); configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_USERPASSWD, DEFAULT_VALUE_USERPASSWD); diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java new file mode 100644 index 000000000..3b3394454 --- /dev/null +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/MessageConfig.java @@ -0,0 +1,95 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt mountpoint-registrar + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ + + +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config; + +import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; +import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; + + +public abstract class MessageConfig implements Configuration { + protected String sectionMarker; + + public static final String PROPERTY_KEY_CONSUMER_TOPIC = "topic"; + + public static final String PROPERTY_KEY_CONSUMER_GROUP = "consumerGroup"; + private static final String DEFAULT_VALUE_CONSUMER_GROUP = "myG"; + + public static final String PROPERTY_KEY_CONSUMER_ID = "consumerID"; + private static final String DEFAULT_VALUE_CONSUMER_ID = "C1"; + + public static final String PROPERTY_KEY_CONSUMER_TIMEOUT = "timeout"; + private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "20000"; + + public static final String PROPERTY_KEY_CONSUMER_LIMIT = "limit"; + private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "10000"; + + public static final String PROPERTY_KEY_CONSUMER_FETCHPAUSE = "fetchPause"; + private static final String DEFAULT_VALUE_CONSUMER_FETCHPAUSE = "5000"; + + protected ConfigurationFileRepresentation configuration; + + public MessageConfig(ConfigurationFileRepresentation configuration) { + this.configuration = configuration; + } + + @Override + public String getSectionName() { + return sectionMarker; + } + + @Override + public void defaults() { + configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_GROUP, + DEFAULT_VALUE_CONSUMER_GROUP); + configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_ID, DEFAULT_VALUE_CONSUMER_ID); + configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_TIMEOUT, + DEFAULT_VALUE_CONSUMER_TIMEOUT); + configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_LIMIT, + DEFAULT_VALUE_CONSUMER_LIMIT); + configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_FETCHPAUSE, + DEFAULT_VALUE_CONSUMER_FETCHPAUSE); + } + + public String getTopic() { + return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TOPIC); + } + + public String getConsumerGroup() { + return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_GROUP); + } + + public String getConsumerId() { + return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_ID); + } + + public String getTimeout() { + return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TIMEOUT); + } + + public String getLimit() { + return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_LIMIT); + } + + public String getFetchPause() { + return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_FETCHPAUSE); + } + +} diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFRegistrationConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/PNFRegistrationConfig.java index acc0b3a89..a2292f576 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFRegistrationConfig.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/PNFRegistrationConfig.java @@ -16,15 +16,13 @@ * the License. * ============LICENSE_END========================================================================== */ -package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config; import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; public class PNFRegistrationConfig extends MessageConfig { private static final String SECTION_MARKER = "pnfRegistration"; - private static final String DEFAULT_VALUE_CONSUMER_USERNAME = "${DMAAP_PNFREG_TOPIC_USERNAME}"; - private static final String DEFAULT_VALUE_CONSUMER_PASSWORD = "${DMAAP_PNFREG_TOPIC_PASSWORD}"; private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.VES_PNFREG_OUTPUT"; @@ -32,10 +30,6 @@ public class PNFRegistrationConfig extends MessageConfig { super(configuration); sectionMarker = SECTION_MARKER; super.configuration.addSection(SECTION_MARKER); - super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME, - DEFAULT_VALUE_CONSUMER_USERNAME); - super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD, - DEFAULT_VALUE_CONSUMER_PASSWORD); super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC, DEFAULT_VALUE_CONSUMER_TOPIC); defaults(); diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/ProvisioningConfig.java index 91a1f3fbe..a2119dbc5 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/ProvisioningConfig.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/ProvisioningConfig.java @@ -16,24 +16,18 @@ * ============LICENSE_END========================================================================== */ -package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config; import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; public class ProvisioningConfig extends MessageConfig { private static final String SECTION_MARKER = "provisioning"; - private static final String DEFAULT_VALUE_CONSUMER_USERNAME = "${DMAAP_CM_TOPIC_USERNAME}"; - private static final String DEFAULT_VALUE_CONSUMER_PASSWORD = "${DMAAP_CM_TOPIC_PASSWORD}"; private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.SEC_3GPP_PROVISIONING_OUTPUT"; public ProvisioningConfig(ConfigurationFileRepresentation configuration) { super(configuration); sectionMarker = SECTION_MARKER; super.configuration.addSection(SECTION_MARKER); - super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME, - DEFAULT_VALUE_CONSUMER_USERNAME); - super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD, - DEFAULT_VALUE_CONSUMER_PASSWORD); super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC, DEFAULT_VALUE_CONSUMER_TOPIC); defaults(); diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StndDefinedFaultConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StndDefinedFaultConfig.java new file mode 100644 index 000000000..0a1381c2a --- /dev/null +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StndDefinedFaultConfig.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt mountpoint-registrar + * ================================================================================================= + * Copyright (C) 2022 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ + +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config; + +import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; + +public class StndDefinedFaultConfig extends MessageConfig { + + private static final String SECTION_MARKER = "stndDefinedFault"; + private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.SEC_3GPP_FAULTSUPERVISION_OUTPUT"; + + public StndDefinedFaultConfig(ConfigurationFileRepresentation configuration) { + super(configuration); + sectionMarker = SECTION_MARKER; + super.configuration.addSection(SECTION_MARKER); + super.configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC, + DEFAULT_VALUE_CONSUMER_TOPIC); + defaults(); + } + +} diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StrimziKafkaConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StrimziKafkaConfig.java new file mode 100644 index 000000000..41ab8a7cb --- /dev/null +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/config/StrimziKafkaConfig.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2022 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config; + +import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; +import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; + +/* + * [strimzi-kafka] + * bootstrapServers=abc:9092,def:9092 + * securityProtocol=PLAINTEXT #OTHER POSSIBLE VALUES - SSL, SASL_PLAINTEXT, SASL_SSL + * saslMechanism=PLAIN #Need to understand more + * saslJaasConfig= + * consumerGroup= + * consumerID= + */ +public class StrimziKafkaConfig implements Configuration { + + private static final String SECTION_MARKER = "strimzi-kafka"; + + private static final String PROPERTY_KEY_ENABLED = "strimziEnabled"; + + private static final String PROPERTY_KEY_BOOTSTRAPSERVERS = "bootstrapServers"; + private static final String DEFAULT_VALUE_BOOTSTRAPSERVERS = "onap-strimzi-kafka-0:9094,onap-strimzi-kafka-1:9094"; + + private static final String PROPERTY_KEY_SECURITYPROTOCOL = "securityProtocol"; + private static final String DEFAULT_VALUE_SECURITYPROTOCOL = "PLAINTEXT"; + + private static final String PROPERTY_KEY_SASLMECHANISM = "saslMechanism"; + private static final String DEFAULT_VALUE_SASLMECHANISM = "PLAIN"; + + private static final String PROPERTY_KEY_SASLJAASCONFIG = "saslJaasConfig"; + private static final String DEFAULT_VALUE_SASLJAASCONFIG = "PLAIN"; // TBD + + private ConfigurationFileRepresentation configuration; + + public StrimziKafkaConfig(ConfigurationFileRepresentation configuration) { + this.configuration = configuration; + configuration.addSection(SECTION_MARKER); + defaults(); + } + + public Boolean getEnabled() { + return configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED); + } + + public String getBootstrapServers() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_BOOTSTRAPSERVERS); + } + + public String getSecurityProtocol() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_SECURITYPROTOCOL); + } + + public String getSaslMechanism() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_SASLMECHANISM); + } + + public String getSaslJaasConfig() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_SASLJAASCONFIG); + } + + @Override + public String getSectionName() { + return SECTION_MARKER; + } + + @Override + public void defaults() { + // The default value should be "false" given that SDNR can be run in + // environments where Strimzi is not used + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_BOOTSTRAPSERVERS, + DEFAULT_VALUE_BOOTSTRAPSERVERS); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_SECURITYPROTOCOL, + DEFAULT_VALUE_SECURITYPROTOCOL); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_SASLMECHANISM, + DEFAULT_VALUE_SASLMECHANISM); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_SASLJAASCONFIG, + DEFAULT_VALUE_SASLJAASCONFIG); + } + +} diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java index 584982a5b..d9d487257 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageClient.java @@ -1,5 +1,10 @@ /* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= * Copyright (C) 2021 Samsung Electronics + * Copyright (C) 2022 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -9,6 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License + * ============LICENSE_END========================================================================== */ package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; @@ -31,11 +37,11 @@ public abstract class MessageClient extends BaseHTTPClient { protected final Map<String, String> headerMap; private String notificationUri; - protected enum SendMethod { + public /* protected */ enum SendMethod { PUT, POST } - protected enum MessageType { + public /* protected */ enum MessageType { xml, json } @@ -95,7 +101,7 @@ public abstract class MessageClient extends BaseHTTPClient { try { response = sendRequest(notificationUri, method.toString(), message, headerMap); } catch (IOException e) { - LOG.warn("Problem sending fault message: {}", e.getMessage()); + LOG.warn("Problem sending message: {}", e.getMessage()); return false; } LOG.debug("Finished with response code {}", response.code); diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageConfig.java deleted file mode 100644 index 8a6f6442e..000000000 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MessageConfig.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * ============LICENSE_START======================================================================== - * ONAP : ccsdk feature sdnr wt mountpoint-registrar - * ================================================================================================= - * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. - * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved. - * ================================================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END========================================================================== - */ - - -package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; - -import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; - -public abstract class MessageConfig implements Configuration { - protected String sectionMarker; - - public static final String PROPERTY_KEY_CONSUMER_TRANSPORTTYPE = "TransportType"; - private static final String DEFAULT_VALUE_CONSUMER_TRANSPORTTYPE = "HTTPNOAUTH"; - - public static final String PROPERTY_KEY_CONSUMER_PROTOCOL = "Protocol"; - private static final String DEFAULT_VALUE_CONSUMER_PROTOCOL = "http"; - - public static final String PROPERTY_KEY_CONSUMER_USERNAME = "username"; - public static final String PROPERTY_KEY_CONSUMER_PASSWORD = "password"; - - public static final String PROPERTY_KEY_CONSUMER_HOST_PORT = "host"; - private static final String DEFAULT_VALUE_CONSUMER_HOST_PORT = "onap-dmaap:3904"; - - public static final String PROPERTY_KEY_CONSUMER_TOPIC = "topic"; - - public static final String PROPERTY_KEY_CONSUMER_CONTENTTYPE = "contenttype"; - private static final String DEFAULT_VALUE_CONSUMER_CONTENTTYPE = "application/json"; - - public static final String PROPERTY_KEY_CONSUMER_GROUP = "group"; - private static final String DEFAULT_VALUE_CONSUMER_GROUP = "myG"; - - public static final String PROPERTY_KEY_CONSUMER_ID = "id"; - private static final String DEFAULT_VALUE_CONSUMER_ID = "C1"; - - public static final String PROPERTY_KEY_CONSUMER_TIMEOUT = "timeout"; - private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "20000"; - - public static final String PROPERTY_KEY_CONSUMER_LIMIT = "limit"; - private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "10000"; - - public static final String PROPERTY_KEY_CONSUMER_FETCHPAUSE = "fetchPause"; - private static final String DEFAULT_VALUE_CONSUMER_FETCHPAUSE = "5000"; - - public static final String PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT = "jersey.config.client.readTimeout"; - private static final String DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT = "25000"; - - public static final String PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT = "jersey.config.client.connectTimeout"; - private static final String DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT = "25000"; - - public static final String PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER = "jersey.config.client.proxy.username"; - private static final String DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER = "${HTTP_PROXY_USERNAME}"; - - public static final String PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD = "jersey.config.client.proxy.password"; - private static final String DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD = "${HTTP_PROXY_PASSWORD}"; - - public static final String PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI = "jersey.config.client.proxy.uri"; - private static final String DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_URI = "${HTTP_PROXY_URI}"; - - protected ConfigurationFileRepresentation configuration; - - public MessageConfig(ConfigurationFileRepresentation configuration) { - this.configuration = configuration; - } - - @Override - public String getSectionName() { - return sectionMarker; - } - - @Override - public void defaults() { - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, - DEFAULT_VALUE_CONSUMER_TRANSPORTTYPE); - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_PROTOCOL, - DEFAULT_VALUE_CONSUMER_PROTOCOL); - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_HOST_PORT, - DEFAULT_VALUE_CONSUMER_HOST_PORT); - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CONTENTTYPE, - DEFAULT_VALUE_CONSUMER_CONTENTTYPE); - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_GROUP, - DEFAULT_VALUE_CONSUMER_GROUP); - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_ID, DEFAULT_VALUE_CONSUMER_ID); - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_TIMEOUT, - DEFAULT_VALUE_CONSUMER_TIMEOUT); - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_LIMIT, - DEFAULT_VALUE_CONSUMER_LIMIT); - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_FETCHPAUSE, - DEFAULT_VALUE_CONSUMER_FETCHPAUSE); - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, - DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT); - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, - DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT); - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER, - DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER); - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD, - DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD); - configuration.setPropertyIfNotAvailable(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI, - DEFAULT_VALUE_CONSUMER_CLIENT_HTTPPROXY_URI); - } - - - - public String getHostPort() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_HOST_PORT); - } - - public String getTransportType() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE); - } - - public String getProtocol() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_PROTOCOL); - } - - public String getUsername() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_USERNAME); - } - - public String getPassword() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_PASSWORD); - } - - public String getTopic() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TOPIC); - } - - public String getConsumerGroup() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_GROUP); - } - - public String getConsumerId() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_ID); - } - - public String getTimeout() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_TIMEOUT); - } - - public String getLimit() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_LIMIT); - } - - public String getFetchPause() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_FETCHPAUSE); - } - - public String getContenttype() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CONTENTTYPE); - } - - public String getClientReadTimeout() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT); - } - - public String getClientConnectTimeout() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT); - } - - public String getHTTPProxyURI() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI); - } - - public String getHTTPProxyUsername() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER); - } - - public String getHTTPProxyPassword() { - return configuration.getProperty(sectionMarker, PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD); - } -} diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java index 136d2a12b..32d68ee62 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java @@ -24,6 +24,13 @@ import java.util.List; import java.util.Map; import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,12 +40,13 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis private static final String APPLICATION_NAME = "mountpoint-registrar"; private static final String CONFIGURATIONFILE = "etc/mountpoint-registrar.properties"; - private Thread dmaapVESMsgConsumerMain = null; + private Thread sKafkaVESMsgConsumerMain = null; private GeneralConfig generalConfig; - private boolean dmaapEnabled = false; + private boolean strimziEnabled = false; private Map<String, MessageConfig> configMap = new HashMap<>(); - private DMaaPVESMsgConsumerMain dmaapConsumerMain = null; + private StrimziKafkaVESMsgConsumerMain sKafkaConsumerMain = null; + private StrimziKafkaConfig strimziKafkaConfig; // Blueprint 1 public MountpointRegistrarImpl() { @@ -53,22 +61,25 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis configFileRepresentation.registerConfigChangedListener(this); generalConfig = new GeneralConfig(configFileRepresentation); + strimziKafkaConfig = new StrimziKafkaConfig(configFileRepresentation); PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation); FaultConfig faultConfig = new FaultConfig(configFileRepresentation); ProvisioningConfig provisioningConfig = new ProvisioningConfig(configFileRepresentation); + StndDefinedFaultConfig stndFaultConfig = new StndDefinedFaultConfig(configFileRepresentation); configMap.put("pnfRegistration", pnfRegConfig); configMap.put("fault", faultConfig); configMap.put("provisioning", provisioningConfig); - - dmaapEnabled = generalConfig.getEnabled(); - if (dmaapEnabled) { // start dmaap consumer thread only if dmaapEnabled=true - LOG.info("DMaaP seems to be enabled, starting consumer(s)"); - dmaapConsumerMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig); - dmaapVESMsgConsumerMain = new Thread(dmaapConsumerMain); - dmaapVESMsgConsumerMain.start(); + configMap.put("stndDefinedFault", stndFaultConfig); + + strimziEnabled = strimziKafkaConfig.getEnabled(); + if (strimziEnabled) { // start Kafka consumer thread only if strimziEnabled=true + LOG.info("Strimzi Kafka seems to be enabled, starting consumer(s)"); + sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig); + sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain); + sKafkaVESMsgConsumerMain.start(); } else { - LOG.info("DMaaP seems to be disabled, not starting any consumer(s)"); + LOG.info("Strimzi Kafka seems to be disabled, not starting any consumer(s)"); } } @@ -83,26 +94,26 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis @Override public void onConfigChanged() { - if (generalConfig == null) { // Included as NullPointerException observed once in docker logs - LOG.warn("onConfigChange cannot be handled. Unexpected Null"); - return; - } - LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled()); - boolean dmaapEnabledNewVal = generalConfig.getEnabled(); - if (!dmaapEnabled && dmaapEnabledNewVal) { // Dmaap disabled earlier (or during bundle startup) but enabled later, start Consumer(s) - LOG.info("DMaaP is enabled, starting consumer(s)"); - dmaapConsumerMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig); - dmaapVESMsgConsumerMain = new Thread(dmaapConsumerMain); - dmaapVESMsgConsumerMain.start(); - } else if (dmaapEnabled && !dmaapEnabledNewVal) { // Dmaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s) - LOG.info("DMaaP is disabled, stopping consumer(s)"); - List<DMaaPVESMsgConsumer> consumers = dmaapConsumerMain.getConsumers(); - for (DMaaPVESMsgConsumer consumer : consumers) { + if (generalConfig == null) { // Included as NullPointerException observed once in docker logs + LOG.warn("onConfigChange cannot be handled. Unexpected Null"); + return; + } + LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled()); + boolean strimziEnabledNewVal = strimziKafkaConfig.getEnabled(); + if (!strimziEnabled && strimziEnabledNewVal) { // Strimzi kafka disabled earlier (or during bundle startup) but enabled later, start Consumer(s) + LOG.info("Strimzi Kafka is enabled, starting consumer(s)"); + sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig); + sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain); + sKafkaVESMsgConsumerMain.start(); + } else if (strimziEnabled && !strimziEnabledNewVal) { // Strimzi kafka enabled earlier (or during bundle startup) but disabled later, stop consumer(s) + LOG.info("Strimzi Kafka is disabled, stopping consumer(s)"); + List<StrimziKafkaVESMsgConsumer> consumers = sKafkaConsumerMain.getConsumers(); + for (StrimziKafkaVESMsgConsumer consumer : consumers) { // stop all consumers consumer.stopConsumer(); } } - dmaapEnabled = dmaapEnabledNewVal; + strimziEnabled = strimziEnabledNewVal; } @Override @@ -125,6 +136,4 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis } } } - - } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumer.java index 2874c906f..2872384f1 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumer.java @@ -20,9 +20,9 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; import java.util.Properties; -public abstract interface DMaaPVESMsgConsumer extends Runnable { +public abstract interface StrimziKafkaVESMsgConsumer extends Runnable { - public abstract void init(Properties baseProperties); + public abstract void init(Properties strimziKafkaProperties, Properties properties); public abstract void processMsg(String msg) throws Exception;//Implement something like InvalidMessageException; diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java index 34b8d4031..249eb612e 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java @@ -19,38 +19,38 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; -import java.util.Properties; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import org.onap.dmaap.mr.client.MRClientFactory; -import org.onap.dmaap.mr.client.MRConsumer; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; +import java.util.List; +import java.util.Properties; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DMaaPVESMsgValidator { +public abstract class StrimziKafkaVESMsgConsumerImpl + implements StrimziKafkaVESMsgConsumer, StrimziKafkaVESMsgValidator { - private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaVESMsgConsumerImpl.class); private static final String DEFAULT_SDNRUSER = "admin"; private static final String DEFAULT_SDNRPASSWD = "admin"; private final String name = this.getClass().getSimpleName(); - private Properties properties = null; - private MRConsumer consumer = null; + private VESMsgKafkaConsumer consumer = null; private boolean running = false; private boolean ready = false; private int fetchPause = 5000; // Default pause between fetch - 5 seconds - private int timeout = 15000; // Default timeout - 15 seconds protected final GeneralConfig generalConfig; - protected DMaaPVESMsgConsumerImpl(GeneralConfig generalConfig) { + protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) { this.generalConfig = generalConfig; } /* - * Thread to fetch messages from the DMaaP topic. Waits for the messages to arrive on the topic until a certain timeout and returns. - * If no data arrives on the topic, sleeps for a certain time period before checking again + * Thread to fetch messages from the Kafka topic. Waits for the messages to + * arrive on the topic until a certain timeout and returns. If no data arrives + * on the topic, sleeps for a certain time period before checking again */ @Override public void run() { @@ -60,35 +60,28 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM while (running) { try { boolean noData = true; - MRConsumerResponse consumerResponse = null; - consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1); - for (String msg : consumerResponse.getActualMessages()) { + List<String> consumerResponse = null; + consumerResponse = consumer.poll(); + for (String msg : consumerResponse) { noData = false; - LOG.debug("{} received ActualMessage from DMaaP VES Message topic {}", name,msg); - if(isMessageValid(msg)) { + LOG.debug("{} received ActualMessage from Kafka VES Message topic {}", name, msg); + if (isMessageValid(msg)) { processMsg(msg); } } if (noData) { - LOG.debug("{} received ResponseCode: {}", name, consumerResponse.getResponseCode()); - LOG.debug("{} received ResponseMessage: {}", name, consumerResponse.getResponseMessage()); - if ((consumerResponse.getResponseCode() == null) - && (consumerResponse.getResponseMessage().contains("SocketTimeoutException"))) { - LOG.warn("Client timeout while waiting for response from Server {}", - consumerResponse.getResponseMessage()); - } pauseThread(); } - } catch (InterruptedException e) { - LOG.warn("Caught exception reading from DMaaP VES Message Topic", e); + } catch (InterruptedException e) { + LOG.warn("Caught exception reading from Kafka Message Topic", e); Thread.currentThread().interrupt(); } catch (JsonProcessingException jsonProcessingException) { LOG.warn("Failed to convert message to JsonNode: {}", jsonProcessingException.getMessage()); } catch (InvalidMessageException invalidMessageException) { LOG.warn("Message is invalid because of: {}", invalidMessageException.getMessage()); } catch (Exception e) { - LOG.error("Caught exception reading from DMaaP VES Message Topic", e); + LOG.error("Caught exception reading from Kafka Message Topic", e); running = false; } } @@ -105,50 +98,19 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM } /* - * Create a consumer by specifying properties containing information such as topic name, timeout, URL etc + * Create a Kafka consumer by specifying properties containing information such as + * topic name, timeout, URL etc */ @Override - public void init(Properties properties) { + public void init(Properties strimziKafkaProperties, Properties consumerProperties) { try { - - String timeoutStr = properties.getProperty("timeout"); - LOG.debug("timeoutStr: {}", timeoutStr); - - if ((timeoutStr != null) && (timeoutStr.length() > 0)) { - timeout = parseTimeOutValue(timeoutStr); - } - - String fetchPauseStr = properties.getProperty("fetchPause"); - LOG.debug("fetchPause(Str): {}",fetchPauseStr); - if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) { - fetchPause = parseFetchPause(fetchPauseStr); - } - LOG.debug("fetchPause: {} ",fetchPause); - - this.consumer = MRClientFactory.createConsumer(properties); + this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties); + this.consumer.subscribe(consumerProperties.getProperty("topic")); ready = true; } catch (Exception e) { - LOG.error("Error initializing DMaaP VES Message consumer from file {} {}",properties, e); - } - } - - private int parseTimeOutValue(String timeoutStr) { - try { - return Integer.parseInt(timeoutStr); - } catch (NumberFormatException e) { - LOG.error("Non-numeric value specified for timeout ({})",timeoutStr); - } - return timeout; - } - - private int parseFetchPause(String fetchPauseStr) { - try { - return Integer.parseInt(fetchPauseStr); - } catch (NumberFormatException e) { - LOG.error("Non-numeric value specified for fetchPause ({})",fetchPauseStr); + LOG.error("Error initializing Kafka Message consumer from file {} {}", consumerProperties, e); } - return fetchPause; } private void pauseThread() throws InterruptedException { @@ -170,16 +132,15 @@ public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer, DM return running; } - public String getProperty(String name) { - return properties.getProperty(name, ""); - } - + /* + * public String getProperty(String name) { return properties.getProperty(name, + * ""); } + */ @Override public void stopConsumer() { running = false; } - public String getBaseUrl() { return generalConfig.getBaseUrl(); } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java index 3626f534a..03573d85b 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java @@ -23,34 +23,58 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.StrimziKafkaCMVESMsgConsumer; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.StrimziKafkaFaultVESMsgConsumer; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg.StrimziKafkaPNFRegVESMsgConsumer; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.stnddefined.StrimziKafkaStndDefinedFaultVESMsgConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DMaaPVESMsgConsumerMain implements Runnable { +public class StrimziKafkaVESMsgConsumerMain implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class); + private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaVESMsgConsumerMain.class); + Properties strimziKafkaProperties = new Properties(); private static final String _PNFREG_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer"; private static final String _FAULT_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer"; private static final String _CM_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer"; + private static final String _STNDDEFINED_FAULT_CLASS = + "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPStndDefinedFaultVESMsgConsumer"; private static final String _PNFREG_DOMAIN = "pnfRegistration"; private static final String _FAULT_DOMAIN = "fault"; private static final String _CM_DOMAIN = "provisioning"; + private static final String _STNDDEFINED_FAULT_DOMAIN = "stndDefinedFault"; boolean threadsRunning = false; - List<DMaaPVESMsgConsumer> consumers = new LinkedList<>(); + List<StrimziKafkaVESMsgConsumer> consumers = new LinkedList<>(); private PNFRegistrationConfig pnfRegistrationConfig; private FaultConfig faultConfig; private GeneralConfig generalConfig; private ProvisioningConfig provisioningConfig; + private StndDefinedFaultConfig stndDefinedFaultConfig; + private StrimziKafkaConfig strimziKafkaConfig; - public DMaaPVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) { + public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) { this.generalConfig = generalConfig; configMap.forEach(this::initialize); } + public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig, + StrimziKafkaConfig strimziKafkaConfig) { + this.generalConfig = generalConfig; + this.strimziKafkaConfig = strimziKafkaConfig; + configMap.forEach(this::initialize); + } + public void initialize(String domain, MessageConfig domainConfig) { LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig); String consumerClass; @@ -60,12 +84,6 @@ public class DMaaPVESMsgConsumerMain implements Runnable { consumerClass = _PNFREG_CLASS; LOG.debug("Consumer class = {}", consumerClass); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, - pnfRegistrationConfig.getTransportType()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, - pnfRegistrationConfig.getHostPort()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, - pnfRegistrationConfig.getContenttype()); consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP, pnfRegistrationConfig.getConsumerGroup()); consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID, @@ -76,61 +94,26 @@ public class DMaaPVESMsgConsumerMain implements Runnable { consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit()); consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, pnfRegistrationConfig.getFetchPause()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, - pnfRegistrationConfig.getProtocol()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME, - pnfRegistrationConfig.getUsername()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD, - pnfRegistrationConfig.getPassword()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, - pnfRegistrationConfig.getClientReadTimeout()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, - pnfRegistrationConfig.getClientConnectTimeout()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI, - pnfRegistrationConfig.getHTTPProxyURI()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER, - pnfRegistrationConfig.getHTTPProxyUsername()); - consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD, - pnfRegistrationConfig.getHTTPProxyPassword()); - - threadsRunning = createConsumer(_PNFREG_DOMAIN, consumerProperties); + + threadsRunning = + createConsumer(_PNFREG_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig)); } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) { this.faultConfig = (FaultConfig) domainConfig; consumerClass = _FAULT_CLASS; LOG.debug("Consumer class = {}", consumerClass); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype()); consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup()); consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId()); consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic()); consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout()); consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit()); consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, - faultConfig.getClientReadTimeout()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, - faultConfig.getClientConnectTimeout()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI, - faultConfig.getHTTPProxyURI()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER, - faultConfig.getHTTPProxyUsername()); - consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD, - faultConfig.getHTTPProxyPassword()); - threadsRunning = createConsumer(_FAULT_DOMAIN, consumerProperties); + + threadsRunning = + createConsumer(_FAULT_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig)); } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) { this.provisioningConfig = (ProvisioningConfig) domainConfig; consumerClass = _CM_CLASS; LOG.debug("Consumer class = {}", consumerClass); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, - provisioningConfig.getTransportType()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, - provisioningConfig.getHostPort()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, - provisioningConfig.getContenttype()); consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP, provisioningConfig.getConsumerGroup()); consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId()); @@ -139,26 +122,43 @@ public class DMaaPVESMsgConsumerMain implements Runnable { consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit()); consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, provisioningConfig.getFetchPause()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, provisioningConfig.getProtocol()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_USERNAME, provisioningConfig.getUsername()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_PASSWORD, provisioningConfig.getPassword()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, - provisioningConfig.getClientReadTimeout()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, - provisioningConfig.getClientConnectTimeout()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_URI, - provisioningConfig.getHTTPProxyURI()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_USER, - provisioningConfig.getHTTPProxyUsername()); - consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_CLIENT_HTTPPROXY_AUTH_PASSWORD, - provisioningConfig.getHTTPProxyPassword()); - threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties); + + threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig)); + } else if (domain.equalsIgnoreCase(_STNDDEFINED_FAULT_DOMAIN)) { + this.stndDefinedFaultConfig = (StndDefinedFaultConfig) domainConfig; + consumerClass = _STNDDEFINED_FAULT_CLASS; + LOG.debug("Consumer class = {}", consumerClass); + consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_GROUP, + stndDefinedFaultConfig.getConsumerGroup()); + consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_ID, + stndDefinedFaultConfig.getConsumerId()); + consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, + stndDefinedFaultConfig.getTopic()); + consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, + stndDefinedFaultConfig.getTimeout()); + consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, + stndDefinedFaultConfig.getLimit()); + consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, + stndDefinedFaultConfig.getFetchPause()); + + threadsRunning = createConsumer(_STNDDEFINED_FAULT_DOMAIN, consumerProperties, + getStrimziKafkaProps(strimziKafkaConfig)); + } + } + + private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) { + if (strimziKafkaProperties.size() == 0) { + strimziKafkaProperties.put("bootstrapServers", strimziKafkaConfig.getBootstrapServers()); + strimziKafkaProperties.put("securityProtocol", strimziKafkaConfig.getSecurityProtocol()); + strimziKafkaProperties.put("saslMechanism", strimziKafkaConfig.getSaslMechanism()); + strimziKafkaProperties.put("saslJaasConfig", strimziKafkaConfig.getSaslJaasConfig()); } + return strimziKafkaProperties; } - private boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) { + private boolean updateThreadState(List<StrimziKafkaVESMsgConsumer> consumers) { boolean threadsRunning = false; - for (DMaaPVESMsgConsumer consumer : consumers) { + for (StrimziKafkaVESMsgConsumer consumer : consumers) { if (consumer.isRunning()) { threadsRunning = true; } @@ -166,31 +166,33 @@ public class DMaaPVESMsgConsumerMain implements Runnable { return threadsRunning; } - public boolean createConsumer(String consumerType, Properties properties) { - DMaaPVESMsgConsumerImpl consumer = null; + public boolean createConsumer(String consumerType, Properties consumerProperties, Properties strimziKafkaProps) { + StrimziKafkaVESMsgConsumerImpl consumer = null; if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN)) - consumer = new DMaaPPNFRegVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig); else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN)) - consumer = new DMaaPFaultVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig); else if (consumerType.equalsIgnoreCase(_CM_DOMAIN)) - consumer = new DMaaPCMVESMsgConsumer(generalConfig); + consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig); + else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN)) + consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig); - handleConsumer(consumer, properties, consumers); + handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers); return !consumers.isEmpty(); } - private boolean handleConsumer(DMaaPVESMsgConsumer consumer, Properties properties, - List<DMaaPVESMsgConsumer> consumers) { + private boolean handleConsumer(StrimziKafkaVESMsgConsumer consumer, Properties consumerProperties, + Properties strimziKafkaProps, List<StrimziKafkaVESMsgConsumer> consumers) { if (consumer != null) { - consumer.init(properties); + consumer.init(strimziKafkaProps, consumerProperties); if (consumer.isReady()) { Thread consumerThread = new Thread(consumer); consumerThread.start(); consumers.add(consumer); - LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), properties); + LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), consumerProperties); return true; } else { LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName()); @@ -218,7 +220,7 @@ public class DMaaPVESMsgConsumerMain implements Runnable { LOG.info("No listener threads running - exiting"); } - public List<DMaaPVESMsgConsumer> getConsumers() { + public List<StrimziKafkaVESMsgConsumer> getConsumers() { return consumers; } diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgValidator.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgValidator.java index 0532334ef..b1bd2fca5 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgValidator.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgValidator.java @@ -18,7 +18,7 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; -public interface DMaaPVESMsgValidator { +public interface StrimziKafkaVESMsgValidator { boolean isMessageValid(String message); diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java new file mode 100644 index 000000000..b8dee44b0 --- /dev/null +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java @@ -0,0 +1,81 @@ +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class that provides a KafkaConsumer to communicate with a kafka cluster + */ +public class VESMsgKafkaConsumer { + + private static final Logger log = LoggerFactory.getLogger(VESMsgKafkaConsumer.class); + final KafkaConsumer<String, String> consumer; + private final int pollTimeout; + private String topicName; + private static final String DESERIALIZER_CLASS = "org.apache.kafka.common.serialization.StringDeserializer"; + + /** + * + * @param consumerProperties + * @param configuration The config provided to the client + */ + public VESMsgKafkaConsumer(Properties strimziKafkaProperties, Properties consumerProperties) { + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers")); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("securityProtocol")); + props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("saslMechanism")); + props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("saslJaasConfig")); + props.put(ConsumerConfig.GROUP_ID_CONFIG, + consumerProperties.getProperty("topic") + "-" + consumerProperties.getProperty("consumerGroup")); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, + consumerProperties.getProperty("topic") + "-" + consumerProperties.getProperty("consumerID")); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DESERIALIZER_CLASS); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DESERIALIZER_CLASS); + consumer = new KafkaConsumer<>(props); + pollTimeout = Integer.parseInt(consumerProperties.getProperty("timeout")); + } + + /** + * + * @param topic The kafka topic to subscribe to + */ + public void subscribe(String topic) { + try { + consumer.subscribe(Collections.singleton(topic)); + this.topicName = topic; + } catch (InvalidGroupIdException e) { + log.error("Invalid Group {}", e.getMessage()); + } + } + + /** + * + * @return The list of records returned from the poll + */ + public List<String> poll() { + List<String> msgs = new ArrayList<>(); + ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(pollTimeout)); + for (ConsumerRecord<String, String> rec : records) { + msgs.add(rec.value()); + } + return msgs; + } + + public String getTopicName() { + return topicName; + } +} diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMBasicHeaderFieldsNotification.java index 98f02ec7a..9a01d53d1 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMBasicHeaderFieldsNotification.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMBasicHeaderFieldsNotification.java @@ -15,7 +15,7 @@ * the License. * ============LICENSE_END========================================================================== */ -package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm; public class CMBasicHeaderFieldsNotification { private String cmNodeId; diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotification.java index 014ff648d..8de9a21d0 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotification.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotification.java @@ -15,7 +15,7 @@ * the License. * ============LICENSE_END========================================================================== */ -package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.CmOperation; diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotificationClient.java index 115f0f0c0..bdd83cce9 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/CMNotificationClient.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/CMNotificationClient.java @@ -16,7 +16,7 @@ * ============LICENSE_END========================================================================== */ -package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm; import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.SendMethod.POST; @@ -24,6 +24,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.MessageType; + public class CMNotificationClient extends MessageClient { private static final String CM_NOTIFICATION_URI = "rests/operations/devicemanager:push-cm-notification"; diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java index 8412e3730..c32d16273 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPCMVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java @@ -16,23 +16,27 @@ * ============LICENSE_END========================================================================== */ -package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import java.time.Instant; import java.time.ZoneId; import java.util.Iterator; import java.util.Map; + +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl { +public class StrimziKafkaCMVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl { - private static final Logger LOG = LoggerFactory.getLogger(DMaaPCMVESMsgConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaCMVESMsgConsumer.class); - public DMaaPCMVESMsgConsumer(GeneralConfig generalConfig) { + public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig) { super(generalConfig); - LOG.info("DMaaPCMVESMsgConsumer started successfully"); + LOG.info("StrimziKafkaCMVESMsgConsumer started successfully"); } @Override @@ -45,16 +49,16 @@ public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl { String notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue(); if (notificationType.equalsIgnoreCase("notifyMOIChanges")) { - LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", cmNodeId); + LOG.info("Read CM message from Kafka topic that is moiChanges type with id {}", cmNodeId); processMoiChanges(rootNode); } else if (notificationType.equalsIgnoreCase("notifyMOICreation")) { - LOG.info("Read CM message from DMaaP topic that is moiCreation type with id {}", cmNodeId); + LOG.info("Read CM message from Kafka topic that is moiCreation type with id {}", cmNodeId); sendCMNotification(preparePayloadMapFromMoi(rootNode, "/event/stndDefinedFields/data/attributeList")); } else if (notificationType.equalsIgnoreCase("notifyMOIDeletion")) { - LOG.info("Read CM message from DMaaP topic that is moiDeletion type with id {}", cmNodeId); + LOG.info("Read CM message from Kafka topic that is moiDeletion type with id {}", cmNodeId); sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList")); } else if (notificationType.equalsIgnoreCase("notifyMOIAttributeValueChanges")) { - LOG.info("Read CM message from DMaaP topic that is moiAttributeValueChanges type with id {}", cmNodeId); + LOG.info("Read CM message from Kafka topic that is moiAttributeValueChanges type with id {}", cmNodeId); sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges")); } else { LOG.warn("Message is invalid, sending aborted, wrong CM notification type {}", notificationType); diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultNotificationClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/FaultNotificationClient.java index ce2538628..60a241831 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultNotificationClient.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/FaultNotificationClient.java @@ -17,12 +17,14 @@ * ============LICENSE_END========================================================================== */ -package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient; + import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.MessageType.*; import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.SendMethod.*; diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java index ee9d0220a..dc65732b4 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPFaultVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java @@ -17,7 +17,7 @@ * ============LICENSE_END========================================================================== */ -package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -25,15 +25,18 @@ import java.io.IOException; import java.time.Instant; import java.time.ZoneId; import java.util.Map; + +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DMaaPFaultVESMsgConsumer extends DMaaPVESMsgConsumerImpl { +public class StrimziKafkaFaultVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl { - private static final Logger LOG = LoggerFactory.getLogger(DMaaPFaultVESMsgConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaFaultVESMsgConsumer.class); - public DMaaPFaultVESMsgConsumer(GeneralConfig generalConfig) { + public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig) { super(generalConfig); } @@ -48,32 +51,32 @@ public class DMaaPFaultVESMsgConsumer extends DMaaPVESMsgConsumerImpl { int faultSequence; String reportingEntityName; ObjectMapper oMapper = new ObjectMapper(); - JsonNode dmaapMessageRootNode; + JsonNode sKafkaMessageRootNode; LOG.info("Fault VES Message is - {}", msg); try { - dmaapMessageRootNode = oMapper.readTree(msg); - reportingEntityName = dmaapMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue(); + sKafkaMessageRootNode = oMapper.readTree(msg); + reportingEntityName = sKafkaMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue(); if (reportingEntityName.equals("ONAP SDN-R")) { LOG.info( - "VES PNF Registration message generated by SDNR, hence no need to process any further; Ignoring the received message"); + "VES Fault message generated by SDNR, hence no need to process any further; Ignoring the received message"); return; } - vesDomain = dmaapMessageRootNode.at("/event/commonEventHeader/domain").textValue(); + vesDomain = sKafkaMessageRootNode.at("/event/commonEventHeader/domain").textValue(); if (!vesDomain.equalsIgnoreCase("fault")) { - LOG.warn("Received {} domain VES Message in DMaaP Fault topic, ignoring it", vesDomain); + LOG.warn("Received {} domain VES Message in Kafka Fault topic, ignoring it", vesDomain); return; } - faultNodeId = dmaapMessageRootNode.at("/event/commonEventHeader/sourceName").textValue(); + faultNodeId = sKafkaMessageRootNode.at("/event/commonEventHeader/sourceName").textValue(); faultOccurrenceTime = Instant .ofEpochMilli( - dmaapMessageRootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000) + sKafkaMessageRootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000) .atZone(ZoneId.of("Z")).toString(); - faultObjectId = dmaapMessageRootNode.at("/event/faultFields/alarmInterfaceA").textValue(); - faultReason = dmaapMessageRootNode.at("/event/faultFields/specificProblem").textValue(); - faultSeverity = dmaapMessageRootNode.at("/event/faultFields/eventSeverity").textValue(); - faultSequence = dmaapMessageRootNode.at("/event/commonEventHeader/sequence").intValue(); + faultObjectId = sKafkaMessageRootNode.at("/event/faultFields/alarmInterfaceA").textValue(); + faultReason = sKafkaMessageRootNode.at("/event/faultFields/specificProblem").textValue(); + faultSeverity = sKafkaMessageRootNode.at("/event/faultFields/eventSeverity").textValue(); + faultSequence = sKafkaMessageRootNode.at("/event/commonEventHeader/sequence").intValue(); if (faultSeverity.equalsIgnoreCase("critical")) { faultSeverity = SeverityType.Critical.toString(); diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFMountPointClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/PNFMountPointClient.java index fd31a3fd6..70402a878 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFMountPointClient.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/PNFMountPointClient.java @@ -17,7 +17,7 @@ * ============LICENSE_END========================================================================== */ -package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg; import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.MessageType.xml; import static org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient.SendMethod.PUT; @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import org.eclipse.jdt.annotation.NonNull; import org.onap.ccsdk.features.sdnr.wt.common.database.requests.BaseRequest; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.MessageClient; public class PNFMountPointClient extends MessageClient { diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPPNFRegVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java index 51d6d1950..147202fb8 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPPNFRegVESMsgConsumer.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java @@ -17,32 +17,34 @@ * ============LICENSE_END========================================================================== */ -package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Map; import org.eclipse.jdt.annotation.Nullable; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DMaaPPNFRegVESMsgConsumer extends DMaaPVESMsgConsumerImpl { +public class StrimziKafkaPNFRegVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl { - private static final Logger LOG = LoggerFactory.getLogger(DMaaPPNFRegVESMsgConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaPNFRegVESMsgConsumer.class); private static final String DEFAULT_PROTOCOL = "SSH"; private static final String DEFAULT_PORT = "17830"; private static final String DEFAULT_USERNAME = "netconf"; private static final String DEFAULT_PASSWORD = "netconf"; - public DMaaPPNFRegVESMsgConsumer(GeneralConfig generalConfig) { + public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig) { super(generalConfig); } @Override public void processMsg(String msg) { - LOG.debug("Message from DMaaP topic is - {} ", msg); + LOG.debug("Message from Kafka topic is - {} ", msg); String pnfId; String pnfIPAddress; @Nullable @@ -57,33 +59,33 @@ public class DMaaPPNFRegVESMsgConsumer extends DMaaPVESMsgConsumerImpl { String pnfPasswd = null; String reportingEntityName; ObjectMapper oMapper = new ObjectMapper(); - JsonNode dmaapMessageRootNode; + JsonNode sKafkaMessageRootNode; try { - dmaapMessageRootNode = oMapper.readTree(msg); - reportingEntityName = dmaapMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue(); + sKafkaMessageRootNode = oMapper.readTree(msg); + reportingEntityName = sKafkaMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue(); if (reportingEntityName.equals("ONAP SDN-R")) { LOG.info( "VES PNF Registration message generated by SDNR, hence no need to process any further; Ignoring the received message"); return; } - pnfId = dmaapMessageRootNode.at("/event/commonEventHeader/sourceName").textValue(); - pnfIPAddress = getPNFIPAddress(dmaapMessageRootNode); + pnfId = sKafkaMessageRootNode.at("/event/commonEventHeader/sourceName").textValue(); + pnfIPAddress = getPNFIPAddress(sKafkaMessageRootNode); pnfCommProtocol = - dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/protocol").textValue(); - pnfCommPort = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/oamPort").textValue(); + sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/protocol").textValue(); + pnfCommPort = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/oamPort").textValue(); if (pnfCommProtocol != null) { if (pnfCommProtocol.equalsIgnoreCase("TLS")) { // Read username and keyId pnfKeyId = - dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/keyId").textValue(); - pnfUsername = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username") + sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/keyId").textValue(); + pnfUsername = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username") .textValue(); } else if (pnfCommProtocol.equalsIgnoreCase("SSH")) { // Read username and password - pnfUsername = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username") + pnfUsername = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username") .textValue(); - pnfPasswd = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/password") + pnfPasswd = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/password") .textValue(); } else { // log warning - Unknown protocol @@ -139,12 +141,12 @@ public class DMaaPPNFRegVESMsgConsumer extends DMaaPVESMsgConsumerImpl { pnfCommPort == null || pnfUsername == null; } - private String getPNFIPAddress(JsonNode dmaapMessageRootNode) { - String ipAddress = dmaapMessageRootNode.at("/event/pnfRegistrationFields/oamV6IpAddress").textValue(); + private String getPNFIPAddress(JsonNode sKafkaMessageRootNode) { + String ipAddress = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/oamV6IpAddress").textValue(); if (ipAddress != null && ipAddress != "") return ipAddress; - ipAddress = dmaapMessageRootNode.at("/event/pnfRegistrationFields/oamV4IpAddress").textValue(); + ipAddress = sKafkaMessageRootNode.at("/event/pnfRegistrationFields/oamV4IpAddress").textValue(); if (ipAddress != null && ipAddress != "") return ipAddress; diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java new file mode 100644 index 000000000..648809722 --- /dev/null +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java @@ -0,0 +1,141 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt mountpoint-registrar + * ================================================================================================= + * Copyright (C) 2022 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ + +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.stnddefined; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Map; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.FaultNotificationClient; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StrimziKafkaStndDefinedFaultVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl { + + private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaStndDefinedFaultVESMsgConsumer.class); + Map<String, String> payloadMapMessage = null; + String faultNodeId; + String notificationType; + + public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig) { + super(generalConfig); + LOG.info("StrimziKafkaStndDefinedFaultVESMsgConsumer started successfully"); + } + + /* + * Supports processing of notifyNewAlarm and notifyClearedAlarm messages ONLY + */ + @Override + public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException { + LOG.debug("Processing StndDefined Fault message {}", msg); + JsonNode rootNode = convertMessageToJsonNode(msg); + try { + + faultNodeId = rootNode.at("/event/commonEventHeader/sourceName").textValue(); + notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue(); + + if (notificationType.equalsIgnoreCase("notifyNewAlarm")) { + LOG.info("Read stndDefined Fault message of type - {} with id {} from Kafka topic", notificationType, + faultNodeId); + processNewAlarm(rootNode); + } else if (notificationType.equalsIgnoreCase("notifyClearedAlarm")) { + LOG.info("Read stdnDefined Fault message of type - {} with id {} from Kafka topic", notificationType, + faultNodeId); + processClearedAlarm(rootNode); + } else { + LOG.warn( + "Read stdnDefined Fault message of type - {} with id {} from Kafka topic. No suitable implementation for processing this message", + notificationType, faultNodeId); + throw new InvalidMessageException(); + } + // Send Fault Notification + String baseUrl = getBaseUrl(); + String sdnrUser = getSDNRUser(); + String sdnrPasswd = getSDNRPasswd(); + + FaultNotificationClient faultClient = new FaultNotificationClient(baseUrl); + LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd); + faultClient.setAuthorization(sdnrUser, sdnrPasswd); + String message = faultClient.prepareMessageFromPayloadMap(payloadMapMessage); + faultClient.sendNotification(message); + } catch (NullPointerException e) { + LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing"); + throw new InvalidMessageException("Missing field"); + } + + } + + private void processClearedAlarm(JsonNode rootNode) { + String faultOccurrenceTime = + Instant.ofEpochMilli(rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000) + .atZone(ZoneId.of("Z")).toString(); + int faultSequence = rootNode.at("/event/commonEventHeader/sequence").intValue(); + String faultObjectId = rootNode.at("/event/stndDefinedFields/data/alarmId").textValue(); + String faultReason = rootNode.at("/event/stndDefinedFields/data/specificProblem").textValue(); + String faultSeverity = + getSDNRSeverityType(rootNode.at("/event/stndDefinedFields/data/perceivedSeverity").textValue()); + + payloadMapMessage = FaultNotificationClient.createFaultNotificationPayloadMap(faultNodeId, + Integer.toString(faultSequence), faultOccurrenceTime, faultObjectId, faultReason, faultSeverity); + + } + + private void processNewAlarm(JsonNode rootNode) { + String faultOccurrenceTime = + Instant.ofEpochMilli(rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000) + .atZone(ZoneId.of("Z")).toString(); + int faultSequence = rootNode.at("/event/commonEventHeader/sequence").intValue(); + String faultObjectId = rootNode.at("/event/stndDefinedFields/data/alarmId").textValue(); + String faultReason = rootNode.at("/event/stndDefinedFields/data/specificProblem").textValue(); + String faultSeverity = + getSDNRSeverityType(rootNode.at("/event/stndDefinedFields/data/perceivedSeverity").textValue()); + + payloadMapMessage = FaultNotificationClient.createFaultNotificationPayloadMap(faultNodeId, + Integer.toString(faultSequence), faultOccurrenceTime, faultObjectId, faultReason, faultSeverity); + + } + + /* + * 3GPP Definition PerceivedSeverity: type: string enum: - INDETERMINATE - + * CRITICAL - MAJOR - MINOR - WARNING - CLEARED + * + */ + private String getSDNRSeverityType(String faultSeverity) { + if (faultSeverity.equalsIgnoreCase("critical")) { + faultSeverity = SeverityType.Critical.toString(); + } else if (faultSeverity.equalsIgnoreCase("major")) { + faultSeverity = SeverityType.Major.toString(); + } else if (faultSeverity.equalsIgnoreCase("minor")) { + faultSeverity = SeverityType.Minor.toString(); + } else if (faultSeverity.equalsIgnoreCase("warning") || faultSeverity.equalsIgnoreCase("indeterminate")) { + faultSeverity = SeverityType.Warning.toString(); + } else if (faultSeverity.equalsIgnoreCase("cleared")) { + faultSeverity = SeverityType.NonAlarmed.toString(); + } else { + faultSeverity = SeverityType.NonAlarmed.toString(); + } + return faultSeverity; + } + +} |