summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap
diff options
context:
space:
mode:
authordecheng zhang <decheng.zhang@huawei.com>2022-02-28 11:15:20 -0500
committerdecheng zhang <decheng.zhang@huawei.com>2022-03-18 08:55:51 -0400
commit7f2e4aa47f56085be6c95cb81b6a8bea8126d56d (patch)
tree74a5316fdad985051a5a824934c522089a76ddf3 /components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap
parentf71b9f809b64f17ac2bedd02a1ed6cbdb7573517 (diff)
Jakarta changes in slice-analysis-ms for IBN Cloud leased line update and CCVPN closed-loop. This commit contains work in sub-task: 1) AAI monitor thread; 2) bandwidth evaluator; 3) policy notification code.1.1.0-slice-analysis-ms
Issue-ID: DCAEGEN2-3063 Signed-off-by: decheng zhang <decheng.zhang@huawei.com> Change-Id: I9029ffd7563e65be59f7fd76adc2a749ff624172 Signed-off-by: decheng zhang <decheng.zhang@huawei.com>
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java96
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java70
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java283
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicParams.java380
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/VesNotificationCallback.java112
5 files changed, 920 insertions, 21 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java
new file mode 100644
index 00000000..dc2cd775
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java
@@ -0,0 +1,96 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.slice.analysis.ms.dmaap;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.onap.slice.analysis.ms.models.Configuration;
+import org.onap.slice.analysis.ms.service.ccvpn.BandwidthEvaluator;
+import org.onap.slice.analysis.ms.service.ccvpn.Event;
+import org.onap.slice.analysis.ms.service.ccvpn.SimpleEvent;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * Handles AAI-EVENT from dmaap
+ */
+@Component
+public class AaiEventNotificationCallback implements NotificationCallback {
+
+ private static final String EVENT_HEADER = "event-header";
+ private static final String ACTION = "action";
+ private static final String ENTITY_TYPE = "entity-type";
+ private static final String SOURCE_NAME = "source-name";
+ private static final String ENTITY = "entity";
+ private final JsonParser parser = new JsonParser();
+ private Configuration configuration;
+ private String aaiNotifTargetAction;
+ private String aaiNotifTargetSource;
+ private String aaiNotifTargetEntity;
+
+ @Autowired
+ BandwidthEvaluator bandwidthEvaluator;
+
+ @PostConstruct
+ public void init(){
+ configuration = Configuration.getInstance();
+ aaiNotifTargetAction = configuration.getAaiNotifTargetAction();
+ aaiNotifTargetSource = configuration.getAaiNotifTargetSource();
+ aaiNotifTargetEntity = configuration.getAaiNotifTargetEntity();
+ }
+
+ @Override
+ public void activateCallBack(String msg) {
+ handleNotification(msg);
+ }
+
+ private void handleNotification(String msg) {
+ JsonElement jsonElement = parser.parse(msg);
+ if (jsonElement.isJsonObject()){
+ //handle a single AAI_EVENT
+ handleMsgJsonObject(jsonElement.getAsJsonObject());
+ } else if (jsonElement.isJsonArray()){
+ //handle a series of AAI_EVENT
+ JsonArray jsonArray = jsonElement.getAsJsonArray();
+ for (int i=0,e=jsonArray.size(); i<e; i++){
+ handleMsgJsonObject(jsonArray.get(i).getAsJsonObject());
+ }
+ }
+ }
+
+ private void handleMsgJsonObject(JsonObject jsonObject){
+ JsonObject header = jsonObject.get(EVENT_HEADER).getAsJsonObject();
+ if (header.has(ACTION) && header.get(ACTION).getAsString().equals(aaiNotifTargetAction)) {
+ if (header.has(ENTITY_TYPE) && header.get(ENTITY_TYPE).getAsString().equals(aaiNotifTargetEntity)){
+ if (header.has(SOURCE_NAME) && header.get(SOURCE_NAME).getAsString().equals(aaiNotifTargetSource)) {
+ JsonObject body = jsonObject.get(ENTITY).getAsJsonObject();
+ Event event = new SimpleEvent<>(SimpleEvent.Type.ONDEMAND_CHECK, body);
+ bandwidthEvaluator.post(event);
+ }
+ }
+ }
+ }
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
index 6e0f4f27..ad5941a4 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
@@ -3,6 +3,7 @@
* slice-analysis-ms
* ================================================================================
* Copyright (C) 2020 Wipro Limited.
+ * Copyright (C) 2022 Huawei Canada Limited.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,20 +39,27 @@ import org.springframework.stereotype.Component;
import com.att.nsa.cambria.client.CambriaConsumer;
/**
- * This class initializes and starts the dmaap client
+ * This class initializes and starts the dmaap client
* to listen on application required dmaap events
*/
@Component
public class DmaapClient {
+ private static final String AAI_SUBSCRIBER = "aai_subscriber";
private Configuration configuration;
private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
private DmaapUtils dmaapUtils;
-
+
@Autowired
private IntelligentSlicingCallback intelligentSlicingCallback;
+ @Autowired
+ private VesNotificationCallback vesNotificationCallback;
+
+ @Autowired
+ private AaiEventNotificationCallback aaiEventNotificationCallback;
+
/**
* init dmaap client.
*/
@@ -74,28 +82,37 @@ public class DmaapClient {
public synchronized void startClient() {
Map<String, Object> streamSubscribes = configuration.getStreamsSubscribes();
-
+
String pmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("performance_management_topic")).get("dmaap_info")).get("topic_url");
String[] pmTopicSplit = pmTopicUrl.split("\\/");
String pmTopic = pmTopicSplit[pmTopicSplit.length - 1];
log.debug("pm topic : {}", pmTopic);
-
+
String policyResponseTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("dcae_cl_response_topic")).get("dmaap_info")).get("topic_url");
String[] policyResponseTopicUrlSplit = policyResponseTopicUrl.split("\\/");
String policyResponseTopic = policyResponseTopicUrlSplit[policyResponseTopicUrlSplit.length - 1];
log.debug("policyResponse Topic : {}", policyResponseTopic);
-
+
String intelligentSlicingTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("intelligent_slicing_topic")).get("dmaap_info")).get("topic_url");
String[] intelligentSlicingTopicSplit = intelligentSlicingTopicUrl.split("\\/");
String intelligentSlicingTopic = intelligentSlicingTopicSplit[intelligentSlicingTopicSplit.length - 1];
log.debug("intelligent slicing topic : {}", pmTopic);
-
+
+ // Parsing ccvpn notification topic
+ String ccvpnNotiTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
+ .get("ves_ccvpn_notification_topic")).get("dmaap_info")).get("topic_url");
+ String[] ccvpnNotiTopicSplit = ccvpnNotiTopicUrl.split("\\/");
+ String ccvpnNotiTopic = ccvpnNotiTopicSplit[ccvpnNotiTopicSplit.length - 1];
+ log.debug("ccvpn notification topic : {}", ccvpnNotiTopic);
+
CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
CambriaConsumer policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic);
CambriaConsumer intelligentSlicingCambriaConsumer = dmaapUtils.buildConsumer(configuration, intelligentSlicingTopic);
+ // Creating ccvpn notification cambriaconsumer
+ CambriaConsumer ccvpnNotiCambriaConsumer = dmaapUtils.buildConsumer(configuration, ccvpnNotiTopic);
ScheduledExecutorService executorPool;
@@ -106,22 +123,33 @@ public class DmaapClient {
executorPool = Executors.newScheduledThreadPool(10);
executorPool.scheduleAtFixedRate(pmNotificationConsumer, 0, configuration.getPollingInterval(),
TimeUnit.SECONDS);
-
+
// create notification consumers for Policy
- NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer,
- new PolicyNotificationCallback());
- // start policy notification consumer threads
- executorPool = Executors.newScheduledThreadPool(10);
- executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(),
- TimeUnit.SECONDS);
-
- // create notification consumers for ML MS
- NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingCambriaConsumer,
- intelligentSlicingCallback);
- // start intelligent Slicing notification consumer threads
- executorPool = Executors.newScheduledThreadPool(10);
- executorPool.scheduleAtFixedRate(intelligentSlicingConsumer, 0, configuration.getPollingInterval(),
- TimeUnit.SECONDS);
+ NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer,
+ new PolicyNotificationCallback());
+ // start policy notification consumer threads
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(),
+ TimeUnit.SECONDS);
+
+ // create notification consumers for ML MS
+ NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingCambriaConsumer,
+ intelligentSlicingCallback);
+ // start intelligent Slicing notification consumer threads
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(intelligentSlicingConsumer, 0, configuration.getPollingInterval(),
+ TimeUnit.SECONDS);
+
+ // create notification consumers for ccvpn close-loop PM
+ NotificationConsumer ccvpnNotiConsumer = new NotificationConsumer(ccvpnNotiCambriaConsumer,
+ vesNotificationCallback);
+ executorPool = Executors.newScheduledThreadPool(1);
+ executorPool.scheduleWithFixedDelay(ccvpnNotiConsumer, 0, configuration.getVesNotifPollingInterval(),
+ TimeUnit.SECONDS);
+
+ // start AAI-EVENT dmaap topic monitor
+ MRTopicMonitor mrTopicMonitor = new MRTopicMonitor(AAI_SUBSCRIBER, aaiEventNotificationCallback);
+ mrTopicMonitor.start();
}
}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java
new file mode 100644
index 00000000..0c1ac604
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java
@@ -0,0 +1,283 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.slice.analysis.ms.dmaap;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import lombok.Getter;
+import lombok.NonNull;
+import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
+import org.onap.dmaap.mr.client.response.MRConsumerResponse;
+import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
+import org.onap.slice.analysis.ms.models.Configuration;
+import org.onap.slice.analysis.ms.dmaap.MRTopicParams;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is a Dmaap message-router topic monitor.
+ * It takes advantage of AT&T's dmaap-client's long-polling implementation, this monitor constantly fetch/refetch target msg topic.
+ * So that new msg can be notified almost immediately. This is the major different from previous implementation.
+ */
+public class MRTopicMonitor implements Runnable {
+
+ private final String name;
+ private volatile boolean running = false;
+ private Configuration configuration;
+ private static Logger logger = LoggerFactory.getLogger(MRTopicMonitor.class);
+ private static int DEFAULT_TIMEOUT_MS_FETCH = 15000;
+ private MRConsumerWrapper consumerWrapper;
+ private NotificationCallback callback;
+
+ /**
+ * Constructor
+ * @param name name of topic subscriber in config
+ * @param callback callbackfunction for received message
+ */
+ public MRTopicMonitor(String name, NotificationCallback callback){
+ this.name = name;
+ this.callback = callback;
+ }
+
+ /**
+ * Start the monitoring thread
+ */
+ public void start(){
+ logger.info("Starting Dmaap Bus Monitor");
+ configuration = Configuration.getInstance();
+
+ Map<String, Object> streamSubscribes = configuration.getStreamsSubscribes();
+ Map<String, Object> topicParamsJson = (Map<String, Object>) streamSubscribes.get(name);
+ JsonObject jsonObject = (new Gson()).toJsonTree(topicParamsJson).getAsJsonObject();
+ consumerWrapper = buildConsumerWrapper(jsonObject);
+ running = true;
+ Executor executor = Executors.newSingleThreadExecutor();
+ executor.execute(this);
+ }
+
+ /**
+ * Main loop that keep fetching and processing
+ */
+ @Override
+ public void run(){
+ while (running){
+ try {
+ Iterable<String> dmaapMsgs = consumerWrapper.fetch();
+ for (String msg : dmaapMsgs){
+ process(msg);
+ }
+ } catch (IOException | RuntimeException e){
+ logger.error("fetchMessage encountered error: {}", e);
+ }
+ }
+ logger.info("{}: exiting thread", this);
+ }
+
+ /**
+ * Stop the monitor
+ */
+ public void stop(){
+ logger.info("{}: exiting", this);
+ running = false;
+ this.consumerWrapper.close();
+ this.consumerWrapper = null;
+ }
+
+ private void process(String msg) {
+ try {
+ callback.activateCallBack(msg);
+ } catch (Exception e){
+ logger.error("process message encountered error: {}", e);
+ }
+ }
+
+ private Iterable<String> fetch() throws IOException {
+ return this.consumerWrapper.fetch();
+ }
+
+ private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson )
+ throws IllegalArgumentException {
+ MRTopicParams topicParams = MRTopicParams.builder().buildFromConfigJson(topicParamsJson).build();
+ return new MRConsumerWrapper(topicParams);
+ }
+
+ /**
+ * Wrapper class of DmaapClient (package org.onap.dmaap.mr.client)
+ * A polling fashion dmaap consumer, whose #fetch() sleep a certain time when connection fails, otherwise keep retryiny.
+ * It supports both https and http protocols.
+ */
+ private class MRConsumerWrapper {
+ /**
+ * Name of the "protocol" property.
+ */
+ protected static final String PROTOCOL_PROP = "Protocol";
+ /**
+ * Fetch timeout.
+ */
+ protected int fetchTimeout;
+
+ /**
+ * Time to sleep on a fetch failure.
+ */
+ @Getter
+ private final int sleepTime;
+
+ /**
+ * Counted down when {@link #close()} is invoked.
+ */
+ private final CountDownLatch closeCondition = new CountDownLatch(1);
+
+ /**
+ * MR Consumer.
+ */
+ protected MRConsumerImpl consumer;
+
+ /**
+ * Constructs the object.
+ *
+ * @param MRTopicParams parameters for the bus topic
+ */
+ protected MRConsumerWrapper(MRTopicParams MRTopicParams) {
+ this.fetchTimeout = MRTopicParams.getFetchTimeout();
+
+ if (this.fetchTimeout <= 0) {
+ this.sleepTime = DEFAULT_TIMEOUT_MS_FETCH;
+ } else {
+ // don't sleep too long, even if fetch timeout is large
+ this.sleepTime = Math.min(this.fetchTimeout, DEFAULT_TIMEOUT_MS_FETCH);
+ }
+
+ if (MRTopicParams.isTopicInvalid()) {
+ throw new IllegalArgumentException("No topic for DMaaP");
+ }
+
+ if (MRTopicParams.isServersInvalid()) {
+ throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
+ }
+
+ try{
+ this.consumer = new MRConsumerImpl.MRConsumerImplBuilder()
+ .setHostPart(MRTopicParams.getServers())
+ .setTopic(MRTopicParams.getTopic())
+ .setConsumerGroup(MRTopicParams.getConsumerGroup())
+ .setConsumerId(MRTopicParams.getConsumerInstance())
+ .setTimeoutMs(MRTopicParams.getFetchTimeout())
+ .setLimit(MRTopicParams.getFetchLimit())
+ .setApiKey(MRTopicParams.getApiKey())
+ .setApiSecret(MRTopicParams.getApiSecret())
+ .createMRConsumerImpl();
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Illegal MrConsumer parameters");
+ }
+
+
+ this.consumer.setUsername(MRTopicParams.getUserName());
+ this.consumer.setPassword(MRTopicParams.getPassword());
+
+ if(MRTopicParams.isUserNameValid() && MRTopicParams.isPasswordValid()){
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+ } else {
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.HTTPNOAUTH.getValue());
+ }
+
+ Properties props = new Properties();
+
+ if (MRTopicParams.isUseHttps()) {
+ props.setProperty(PROTOCOL_PROP, "https");
+ this.consumer.setHost(MRTopicParams.getServers().get(0) + ":3905");
+
+ } else {
+ props.setProperty(PROTOCOL_PROP, "http");
+ this.consumer.setHost(MRTopicParams.getServers().get(0) + ":3904");
+ }
+
+ this.consumer.setProps(props);
+ }
+
+ /**
+ * Try fetch new message. But backoff for some sleepTime when connection fails.
+ * @return
+ * @throws IOException
+ */
+ public Iterable<String> fetch() throws IOException {
+ final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
+ if (response == null) {
+ logger.warn("{}: DMaaP NULL response received", this);
+
+ sleepAfterFetchFailure();
+ return new ArrayList<>();
+ } else {
+ logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
+ response.getResponseMessage());
+
+ if (!"200".equals(response.getResponseCode())) {
+
+ logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
+ response.getResponseMessage());
+
+ sleepAfterFetchFailure();
+ }
+ }
+
+ if (response.getActualMessages() == null) {
+ return new ArrayList<>();
+ } else {
+ return response.getActualMessages();
+ }
+ }
+
+ /**
+ * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
+ * or the thread is interrupted, then this will return immediately.
+ */
+ protected void sleepAfterFetchFailure() {
+ try {
+ logger.info("{}: backoff for {}ms", this, sleepTime);
+ if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
+ logger.info("{}: closed while handling fetch error", this);
+ }
+
+ } catch (InterruptedException e) {
+ logger.warn("{}: interrupted while handling fetch error", this, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Close the dmaap client and this thread
+ */
+ public void close() {
+ this.closeCondition.countDown();
+ this.consumer.close();
+ }
+ }
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicParams.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicParams.java
new file mode 100644
index 00000000..66c27413
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicParams.java
@@ -0,0 +1,380 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved.
+ * Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2022 Huawei Canada Limited.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.slice.analysis.ms.dmaap;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Partially copied from Onap Policy
+ * policy/common/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
+ * Modified to fit this project.
+ * Member variables of this Params class are as follows.
+ *
+ * <p>servers DMaaP servers
+ * topic DMaaP Topic to be monitored
+ * apiKey DMaaP API Key (optional)
+ * apiSecret DMaaP API Secret (optional)
+ * consumerGroup DMaaP Reader Consumer Group
+ * consumerInstance DMaaP Reader Instance
+ * fetchTimeout DMaaP fetch timeout
+ * fetchLimit DMaaP fetch limit
+ * environment DME2 Environment
+ * aftEnvironment DME2 AFT Environment
+ * partner DME2 Partner
+ * latitude DME2 Latitude
+ * longitude DME2 Longitude
+ * additionalProps Additional properties to pass to DME2
+ * useHttps does connection use HTTPS?
+ * allowSelfSignedCerts are self-signed certificates allow
+ */
+@Getter
+@Setter
+public class MRTopicParams {
+
+ private int port;
+ private List<String> servers;
+ private Map<String, String> additionalProps;
+ private String topic;
+ private String effectiveTopic;
+ private String apiKey;
+ private String apiSecret;
+ private String consumerGroup;
+ private String consumerInstance;
+ private int fetchTimeout;
+ private int fetchLimit;
+ private boolean useHttps;
+ private boolean allowSelfSignedCerts;
+ private boolean managed;
+
+ private String userName;
+ private String password;
+ private String environment;
+ private String aftEnvironment;
+ private String partner;
+ private String latitude;
+ private String longitude;
+ private String partitionId;
+ private String clientName;
+ private String hostname;
+ private String basePath;
+ @Getter
+ private String serializationProvider;
+
+ public static TopicParamsBuilder builder() {
+ return new TopicParamsBuilder();
+ }
+
+ /**
+ * Methods to Check if the property is INVALID.
+ */
+
+ public boolean isEnvironmentInvalid() {
+ return StringUtils.isBlank(environment);
+ }
+
+ public boolean isAftEnvironmentInvalid() {
+ return StringUtils.isBlank(aftEnvironment);
+ }
+
+ public boolean isLatitudeInvalid() {
+ return StringUtils.isBlank(latitude);
+ }
+
+ public boolean isLongitudeInvalid() {
+ return StringUtils.isBlank(longitude);
+ }
+
+ public boolean isConsumerInstanceInvalid() {
+ return StringUtils.isBlank(consumerInstance);
+ }
+
+ public boolean isConsumerGroupInvalid() {
+ return StringUtils.isBlank(consumerGroup);
+ }
+
+ public boolean isClientNameInvalid() {
+ return StringUtils.isBlank(clientName);
+ }
+
+ public boolean isPartnerInvalid() {
+ return StringUtils.isBlank(partner);
+ }
+
+ public boolean isServersInvalid() {
+ return (servers == null || servers.isEmpty()
+ || (servers.size() == 1 && ("".equals(servers.get(0)))));
+ }
+
+ public boolean isTopicInvalid() {
+ return StringUtils.isBlank(topic);
+ }
+
+ public boolean isPartitionIdInvalid() {
+ return StringUtils.isBlank(partitionId);
+ }
+
+ public boolean isHostnameInvalid() {
+ return StringUtils.isBlank(hostname);
+ }
+
+ public boolean isPortInvalid() {
+ return (port <= 0 || port >= 65535);
+ }
+
+ /**
+ * Methods to Check if the property is Valid.
+ */
+
+ public boolean isApiKeyValid() {
+ return StringUtils.isNotBlank(apiKey);
+ }
+
+ public boolean isApiSecretValid() {
+ return StringUtils.isNotBlank(apiSecret);
+ }
+
+ public boolean isUserNameValid() {
+ return StringUtils.isNotBlank(userName);
+ }
+
+ public boolean isPasswordValid() {
+ return StringUtils.isNotBlank(password);
+ }
+
+ public boolean isAdditionalPropsValid() {
+ return additionalProps != null;
+ }
+
+ @NoArgsConstructor(access = AccessLevel.PRIVATE)
+ public static class TopicParamsBuilder {
+
+ final MRTopicParams params = new MRTopicParams();
+
+ public TopicParamsBuilder servers(List<String> servers) {
+ this.params.servers = servers;
+ return this;
+ }
+
+ public TopicParamsBuilder topic(String topic) {
+ this.params.topic = topic;
+ return this;
+ }
+
+ public TopicParamsBuilder effectiveTopic(String effectiveTopic) {
+ this.params.effectiveTopic = effectiveTopic;
+ return this;
+ }
+
+ public TopicParamsBuilder apiKey(String apiKey) {
+ this.params.apiKey = apiKey;
+ return this;
+ }
+
+ public TopicParamsBuilder apiSecret(String apiSecret) {
+ this.params.apiSecret = apiSecret;
+ return this;
+ }
+
+ public TopicParamsBuilder consumerGroup(String consumerGroup) {
+ this.params.consumerGroup = consumerGroup;
+ return this;
+ }
+
+ public TopicParamsBuilder consumerInstance(String consumerInstance) {
+ this.params.consumerInstance = consumerInstance;
+ return this;
+ }
+
+ public TopicParamsBuilder fetchTimeout(int fetchTimeout) {
+ this.params.fetchTimeout = fetchTimeout;
+ return this;
+ }
+
+ public TopicParamsBuilder fetchLimit(int fetchLimit) {
+ this.params.fetchLimit = fetchLimit;
+ return this;
+ }
+
+ public TopicParamsBuilder useHttps(boolean useHttps) {
+ this.params.useHttps = useHttps;
+ return this;
+ }
+
+ public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) {
+ this.params.allowSelfSignedCerts = allowSelfSignedCerts;
+ return this;
+ }
+
+ public TopicParamsBuilder userName(String userName) {
+ this.params.userName = userName;
+ return this;
+ }
+
+ public TopicParamsBuilder password(String password) {
+ this.params.password = password;
+ return this;
+ }
+
+ public TopicParamsBuilder environment(String environment) {
+ this.params.environment = environment;
+ return this;
+ }
+
+ public TopicParamsBuilder aftEnvironment(String aftEnvironment) {
+ this.params.aftEnvironment = aftEnvironment;
+ return this;
+ }
+
+ public TopicParamsBuilder partner(String partner) {
+ this.params.partner = partner;
+ return this;
+ }
+
+ public TopicParamsBuilder latitude(String latitude) {
+ this.params.latitude = latitude;
+ return this;
+ }
+
+ public TopicParamsBuilder longitude(String longitude) {
+ this.params.longitude = longitude;
+ return this;
+ }
+
+ public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) {
+ this.params.additionalProps = additionalProps;
+ return this;
+ }
+
+ public TopicParamsBuilder partitionId(String partitionId) {
+ this.params.partitionId = partitionId;
+ return this;
+ }
+
+ public MRTopicParams build() {
+ return params;
+ }
+
+ public TopicParamsBuilder buildFromConfigJson(JsonObject jsonObject) {
+ String consumerGroup = null;
+ String consumerInstance = null;
+ String aafUsername = null;
+ String aafPassword = null;
+ List<String> servers = new ArrayList<>();
+ String topic = null;
+ boolean useHttps = false;
+ int fetchTimeout = -1;
+ int fetchLimit = -1;
+
+ if (jsonObject.has("consumer_group") && !jsonObject.get("consumer_group").isJsonNull()) {
+ consumerGroup = jsonObject.get("consumer_group").getAsString();
+ }
+ if (jsonObject.has("consumer_instance") && !jsonObject.get("consumer_instance").isJsonNull()) {
+ consumerInstance = jsonObject.get("consumer_instance").getAsString();
+ }
+ if (jsonObject.has("aaf_username") && !jsonObject.get("aaf_username").isJsonNull()) {
+ aafUsername = jsonObject.get("aaf_username").getAsString();
+ }
+ if (jsonObject.has("aaf_password") && !jsonObject.get("aaf_password").isJsonNull()) {
+ aafPassword = jsonObject.get("aaf_password").getAsString();
+ }
+ if (jsonObject.has("fetch_timeout") && !jsonObject.get("fetch_timeout").isJsonNull()) {
+ fetchTimeout = jsonObject.get("fetch_timeout").getAsInt();
+ }
+ if (jsonObject.has("fetch_limit") && !jsonObject.get("fetch_limit").isJsonNull()) {
+ fetchLimit = jsonObject.get("fetch_limit").getAsInt();
+ }
+ if (jsonObject.has("servers") && !jsonObject.get("servers").isJsonNull()) {
+ JsonArray jsonArray = jsonObject.get("servers").getAsJsonArray();
+ servers = new ArrayList<>();
+ for (int i=0, e=jsonArray.size(); i<e; i++){
+ servers.add(jsonArray.get(i).getAsString());
+ }
+ }
+ if (jsonObject.has("servers") && !jsonObject.get("servers").isJsonNull()) {
+ JsonArray jsonArray = jsonObject.get("servers").getAsJsonArray();
+ for (int i=0, e=jsonArray.size(); i<e; i++){
+ servers.add(jsonArray.get(i).getAsString());
+ }
+ }
+ String topicUrl = jsonObject.get("dmaap_info").getAsJsonObject().get("topic_url").getAsString();
+ if (topicUrl.startsWith("https")){
+ useHttps = true;
+ }
+ String[] pmTopicSplit = topicUrl.split("\\/");
+ topic = pmTopicSplit[pmTopicSplit.length - 1];
+
+ this.params.topic = topic;
+ this.params.servers = servers;
+ this.params.consumerGroup = consumerGroup;
+ this.params.consumerInstance = consumerInstance;
+ this.params.password = aafPassword;
+ this.params.userName = aafUsername;
+ this.params.fetchTimeout = fetchTimeout;
+ this.params.fetchLimit = fetchLimit;
+ this.params.useHttps = useHttps;
+ return this;
+ }
+
+ public TopicParamsBuilder managed(boolean managed) {
+ this.params.managed = managed;
+ return this;
+ }
+
+ public TopicParamsBuilder hostname(String hostname) {
+ this.params.hostname = hostname;
+ return this;
+ }
+
+ public TopicParamsBuilder clientName(String clientName) {
+ this.params.clientName = clientName;
+ return this;
+ }
+
+ public TopicParamsBuilder port(int port) {
+ this.params.port = port;
+ return this;
+ }
+
+ public TopicParamsBuilder basePath(String basePath) {
+ this.params.basePath = basePath;
+ return this;
+ }
+
+ public TopicParamsBuilder serializationProvider(String serializationProvider) {
+ this.params.serializationProvider = serializationProvider;
+ return this;
+ }
+
+ }
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/VesNotificationCallback.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/VesNotificationCallback.java
new file mode 100644
index 00000000..83bfcdbe
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/VesNotificationCallback.java
@@ -0,0 +1,112 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.slice.analysis.ms.dmaap;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.onap.slice.analysis.ms.models.Configuration;
+import org.onap.slice.analysis.ms.models.vesnotification.NotificationFields;
+
+import org.onap.slice.analysis.ms.service.ccvpn.CCVPNPmDatastore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.io.IOException;
+
+/**
+ * Handles Notification on dmaap for ves notification events
+ */
+@Component
+public class VesNotificationCallback implements NotificationCallback {
+
+ private Configuration configuration;
+ private String NOTIFICATIONFIELDS = "notificationFields";
+ private String EVENT = "event";
+ private String vesNotifChangeIdentifier;
+ private String vesNotfiChangeType;
+
+ @Autowired
+ CCVPNPmDatastore ccvpnPmDatastore;
+
+ private static Logger log = LoggerFactory.getLogger(VesNotificationCallback.class);
+
+ /**
+ * init ves callback; load configuration.
+ */
+ @PostConstruct
+ public void init(){
+ configuration = Configuration.getInstance();
+ vesNotifChangeIdentifier = configuration.getVesNotifChangeIdentifier();
+ vesNotfiChangeType = configuration.getVesNotifChangeType();
+ }
+
+ /**
+ * Triggers on handleNofitication method
+ * @param msg incoming message
+ */
+ @Override
+ public void activateCallBack(String msg) {
+ handleNotification(msg);
+ }
+
+ /**
+ * Parse Performance dmaap notification and save to DB
+ * @param msg incoming message
+ */
+ private void handleNotification(String msg) {
+ log.info("Message received from VES : {}" ,msg);
+ ObjectMapper obj = new ObjectMapper();
+ NotificationFields output = null;
+ String notifChangeIdentifier = "";
+ String notifChangeType = "";
+ String cllId = null;
+ String uniId = null;
+ String bw = null;
+ try {
+ JsonNode node = obj.readTree(msg);
+ JsonNode notificationNode = node.get(EVENT).get(NOTIFICATIONFIELDS);
+ output = obj.treeToValue(notificationNode, NotificationFields.class);
+
+ //Filter out target notification changeIdentifier and changeType
+ notifChangeIdentifier = output.getChangeIdentifier();
+ notifChangeType = output.getChangeType();
+ if (notifChangeType.equals(vesNotfiChangeType)
+ && notifChangeIdentifier.equals(vesNotifChangeIdentifier)) {
+ cllId = output.getArrayOfNamedHashMap().get(0).getHashMap().getCllId();
+ uniId = output.getArrayOfNamedHashMap().get(0).getHashMap().getUniId();
+ bw = output.getArrayOfNamedHashMap().get(0).getHashMap().getBandwidthValue();
+ }
+ }
+ catch (IOException e) {
+ log.error("Error converting VES msg to object, {}", e.getMessage());
+ }
+ if (cllId != null && uniId != null && bw != null){
+ ccvpnPmDatastore.addUsedBwToEndpoint(cllId, uniId, bw);
+ }
+
+ }
+
+}