summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java
diff options
context:
space:
mode:
authorzhaoyh6 <zhaoyh6@asiainfo.com>2022-07-28 16:33:38 +0800
committerzhao yehua <zhaoyh6@asiainfo.com>2022-08-08 02:11:59 +0000
commit61f9c604018a27bf9438415aca03d05dc9974dcb (patch)
treee15678f9104d488c9c1c2cbf2a39dba59aa458fa /components/slice-analysis-ms/src/main/java
parent015b7529adc61181862c84a20ed7140a96e479dc (diff)
feat:Enhance sliceanalysis MS to use DCAE SDK dmaap-client lib
Issue-ID: DCAEGEN2-3120 Signed-off-by: zhaoyh6 <zhaoyh6@asiainfo.com> Change-Id: I946c7a4b49906cb402062235a97452fb7856c8f0
Diffstat (limited to 'components/slice-analysis-ms/src/main/java')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java3
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java52
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java96
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicParams.java9
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java28
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java21
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java20
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java4
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DcaeDmaapUtil.java89
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DmaapUtils.java141
10 files changed, 188 insertions, 275 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java
index 0259f130..9bff14a0 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java
@@ -3,6 +3,7 @@
* slice-analysis-ms
* ================================================================================
* Copyright (C) 2022 Huawei Canada Limited.
+ * Copyright (C) 2022 CTC, Inc.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -105,7 +106,7 @@ public class AaiEventNotificationCallback implements NotificationCallback {
}
JsonObject entity = jsonObject.get(ENTITY).getAsJsonObject();
JsonObject body = getNestedJsonObject(entity, aaiNotifTargetEntity);
- logger.debug("AAI-EVENT entity object {}", body);
+ logger.info("AAI-EVENT entity object {}", body);
if (body == null){
return;
}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
index ad5941a4..6e4dbe18 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
@@ -4,6 +4,7 @@
* ================================================================================
* Copyright (C) 2020 Wipro Limited.
* Copyright (C) 2022 Huawei Canada Limited.
+ * Copyright (C) 2022 CTC, Inc.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,15 +30,15 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.onap.slice.analysis.ms.models.Configuration;
-import org.onap.slice.analysis.ms.utils.DmaapUtils;
+import org.onap.slice.analysis.ms.utils.DcaeDmaapUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.att.nsa.cambria.client.CambriaConsumer;
-
/**
* This class initializes and starts the dmaap client
* to listen on application required dmaap events
@@ -49,8 +50,6 @@ public class DmaapClient {
private Configuration configuration;
private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
- private DmaapUtils dmaapUtils;
-
@Autowired
private IntelligentSlicingCallback intelligentSlicingCallback;
@@ -66,7 +65,6 @@ public class DmaapClient {
@PostConstruct
public void initClient() {
log.debug("initializing client");
- dmaapUtils = new DmaapUtils();
configuration = Configuration.getInstance();
if (log.isDebugEnabled()) {
log.debug(configuration.toString());
@@ -85,39 +83,33 @@ public class DmaapClient {
String pmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("performance_management_topic")).get("dmaap_info")).get("topic_url");
- String[] pmTopicSplit = pmTopicUrl.split("\\/");
- String pmTopic = pmTopicSplit[pmTopicSplit.length - 1];
- log.debug("pm topic : {}", pmTopic);
String policyResponseTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("dcae_cl_response_topic")).get("dmaap_info")).get("topic_url");
- String[] policyResponseTopicUrlSplit = policyResponseTopicUrl.split("\\/");
- String policyResponseTopic = policyResponseTopicUrlSplit[policyResponseTopicUrlSplit.length - 1];
- log.debug("policyResponse Topic : {}", policyResponseTopic);
String intelligentSlicingTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("intelligent_slicing_topic")).get("dmaap_info")).get("topic_url");
- String[] intelligentSlicingTopicSplit = intelligentSlicingTopicUrl.split("\\/");
- String intelligentSlicingTopic = intelligentSlicingTopicSplit[intelligentSlicingTopicSplit.length - 1];
- log.debug("intelligent slicing topic : {}", pmTopic);
// Parsing ccvpn notification topic
String ccvpnNotiTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("ves_ccvpn_notification_topic")).get("dmaap_info")).get("topic_url");
- String[] ccvpnNotiTopicSplit = ccvpnNotiTopicUrl.split("\\/");
- String ccvpnNotiTopic = ccvpnNotiTopicSplit[ccvpnNotiTopicSplit.length - 1];
- log.debug("ccvpn notification topic : {}", ccvpnNotiTopic);
- CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
- CambriaConsumer policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic);
- CambriaConsumer intelligentSlicingCambriaConsumer = dmaapUtils.buildConsumer(configuration, intelligentSlicingTopic);
- // Creating ccvpn notification cambriaconsumer
- CambriaConsumer ccvpnNotiCambriaConsumer = dmaapUtils.buildConsumer(configuration, ccvpnNotiTopic);
+ MessageRouterSubscriber pmNotifSubscriber = DcaeDmaapUtil.buildSubscriber();
+ MessageRouterSubscribeRequest pmNotifReqest = DcaeDmaapUtil.buildSubscriberRequest("performance_management_topic", pmTopicUrl);
+
+ MessageRouterSubscriber policyNotifSubscriber = DcaeDmaapUtil.buildSubscriber();
+ MessageRouterSubscribeRequest policyNotifReqest = DcaeDmaapUtil.buildSubscriberRequest("dcae_cl_response_topic", policyResponseTopicUrl);
+
+ MessageRouterSubscriber intelligentSlicingSubscriber = DcaeDmaapUtil.buildSubscriber();
+ MessageRouterSubscribeRequest intelligentSlicingReqest = DcaeDmaapUtil.buildSubscriberRequest("intelligent_slicing_topic", intelligentSlicingTopicUrl);
+
+ MessageRouterSubscriber ccvpnNotiSubscriber = DcaeDmaapUtil.buildSubscriber();
+ MessageRouterSubscribeRequest ccvpnNotiReqest = DcaeDmaapUtil.buildSubscriberRequest("ves_ccvpn_notification_topic", ccvpnNotiTopicUrl);
ScheduledExecutorService executorPool;
// create notification consumers for PM
- NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer,
+ NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifSubscriber, pmNotifReqest,
new PmNotificationCallback());
// start pm notification consumer threads
executorPool = Executors.newScheduledThreadPool(10);
@@ -125,7 +117,7 @@ public class DmaapClient {
TimeUnit.SECONDS);
// create notification consumers for Policy
- NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer,
+ NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyNotifSubscriber, policyNotifReqest,
new PolicyNotificationCallback());
// start policy notification consumer threads
executorPool = Executors.newScheduledThreadPool(10);
@@ -133,7 +125,7 @@ public class DmaapClient {
TimeUnit.SECONDS);
// create notification consumers for ML MS
- NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingCambriaConsumer,
+ NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingSubscriber, intelligentSlicingReqest,
intelligentSlicingCallback);
// start intelligent Slicing notification consumer threads
executorPool = Executors.newScheduledThreadPool(10);
@@ -141,15 +133,19 @@ public class DmaapClient {
TimeUnit.SECONDS);
// create notification consumers for ccvpn close-loop PM
- NotificationConsumer ccvpnNotiConsumer = new NotificationConsumer(ccvpnNotiCambriaConsumer,
+ NotificationConsumer ccvpnNotiConsumer = new NotificationConsumer(ccvpnNotiSubscriber, ccvpnNotiReqest,
vesNotificationCallback);
executorPool = Executors.newScheduledThreadPool(1);
executorPool.scheduleWithFixedDelay(ccvpnNotiConsumer, 0, configuration.getVesNotifPollingInterval(),
TimeUnit.SECONDS);
// start AAI-EVENT dmaap topic monitor
- MRTopicMonitor mrTopicMonitor = new MRTopicMonitor(AAI_SUBSCRIBER, aaiEventNotificationCallback);
+ MRTopicMonitor mrTopicMonitor = getMRTopicMonitor();
mrTopicMonitor.start();
}
+ public MRTopicMonitor getMRTopicMonitor() {
+ return new MRTopicMonitor(AAI_SUBSCRIBER, aaiEventNotificationCallback);
+ }
+
}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java
index aa1bc964..f3330dc3 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java
@@ -3,6 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2022 Huawei Canada Limited.
+ * Copyright (C) 2022 CTC, Inc.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,23 +22,23 @@
package org.onap.slice.analysis.ms.dmaap;
import com.google.gson.Gson;
+import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import io.vavr.collection.List;
import lombok.Getter;
import lombok.NonNull;
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.onap.slice.analysis.ms.models.Configuration;
-import org.onap.slice.analysis.ms.dmaap.MRTopicParams;
+import org.onap.slice.analysis.ms.utils.DcaeDmaapUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
import java.io.IOException;
-import java.net.MalformedURLException;
-import java.util.ArrayList;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -92,11 +93,11 @@ public class MRTopicMonitor implements Runnable {
while (running){
try {
logger.debug("Topic: {} getting new msg...", name);
- Iterable<String> dmaapMsgs = consumerWrapper.fetch();
- for (String msg : dmaapMsgs){
+ List<JsonElement> dmaapMsgs = consumerWrapper.fetch();
+ for (JsonElement msg : dmaapMsgs){
logger.debug("Received message: {}" +
"\r\n and processing start", msg);
- process(msg);
+ process(msg.toString());
}
} catch (IOException | RuntimeException e){
logger.error("fetchMessage encountered error: {}", e);
@@ -123,7 +124,7 @@ public class MRTopicMonitor implements Runnable {
}
}
- private Iterable<String> fetch() throws IOException {
+ private List<JsonElement> fetch() throws IOException {
return this.consumerWrapper.fetch();
}
@@ -159,10 +160,8 @@ public class MRTopicMonitor implements Runnable {
*/
private final CountDownLatch closeCondition = new CountDownLatch(1);
- /**
- * MR Consumer.
- */
- protected MRConsumerImpl consumer;
+ protected MessageRouterSubscriber subscriber;
+ protected MessageRouterSubscribeRequest request;
/**
* Constructs the object.
@@ -188,42 +187,13 @@ public class MRTopicMonitor implements Runnable {
}
try{
- this.consumer = new MRConsumerImpl.MRConsumerImplBuilder()
- .setHostPart(MRTopicParams.getServers())
- .setTopic(MRTopicParams.getTopic())
- .setConsumerGroup(MRTopicParams.getConsumerGroup())
- .setConsumerId(MRTopicParams.getConsumerInstance())
- .setTimeoutMs(MRTopicParams.getFetchTimeout())
- .setLimit(MRTopicParams.getFetchLimit())
- .setApiKey(MRTopicParams.getApiKey())
- .setApiSecret(MRTopicParams.getApiSecret())
- .createMRConsumerImpl();
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("Illegal MrConsumer parameters");
- }
-
-
- this.consumer.setUsername(MRTopicParams.getUserName());
- this.consumer.setPassword(MRTopicParams.getPassword());
-
- if(MRTopicParams.isUserNameValid() && MRTopicParams.isPasswordValid()){
- this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
- } else {
- this.consumer.setProtocolFlag(ProtocolTypeConstants.HTTPNOAUTH.getValue());
- }
-
- Properties props = new Properties();
-
- if (MRTopicParams.isUseHttps()) {
- props.setProperty(PROTOCOL_PROP, "https");
- this.consumer.setHost(MRTopicParams.getServers().get(0) + ":3905");
+ this.subscriber = DcaeDmaapUtil.buildSubscriber();
+ this.request = DcaeDmaapUtil.buildSubscriberRequest("aai_subscriber", MRTopicParams.getTopic());
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- this.consumer.setHost(MRTopicParams.getServers().get(0) + ":3904");
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Illegal MrConsumer parameters");
}
- this.consumer.setProps(props);
}
/**
@@ -231,31 +201,12 @@ public class MRTopicMonitor implements Runnable {
* @return
* @throws IOException
*/
- public Iterable<String> fetch() throws IOException {
- final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
- if (response == null) {
- logger.warn("{}: DMaaP NULL response received", this);
-
- sleepAfterFetchFailure();
- return new ArrayList<>();
- } else {
- logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
- response.getResponseMessage());
-
- if (!"200".equals(response.getResponseCode())) {
-
- logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
- response.getResponseMessage());
+ public List<JsonElement> fetch() throws IOException {
+ Mono<MessageRouterSubscribeResponse> responses = this.subscriber.get(this.request);
+ MessageRouterSubscribeResponse resp = responses.block();
+ List<JsonElement> list = resp.items();
+ return list;
- sleepAfterFetchFailure();
- }
- }
-
- if (response.getActualMessages() == null) {
- return new ArrayList<>();
- } else {
- return response.getActualMessages();
- }
}
/**
@@ -280,7 +231,6 @@ public class MRTopicMonitor implements Runnable {
*/
public void close() {
this.closeCondition.countDown();
- this.consumer.close();
}
}
}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicParams.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicParams.java
index e5aaa1e3..26f16baf 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicParams.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicParams.java
@@ -3,9 +3,10 @@
* ONAP
* ================================================================================
* Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved.
- * Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
- * Modifications Copyright (C) 2022 Huawei Canada Limited.
+ * Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * Copyright (C) 2022 CTC, Inc.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -329,7 +330,7 @@ public class MRTopicParams {
String[] pmTopicSplit = topicUrl.split("\\/");
topic = pmTopicSplit[pmTopicSplit.length - 1];
- this.params.topic = topic;
+ this.params.topic = topicUrl;
this.params.servers = servers;
this.params.consumerGroup = consumerGroup;
this.params.consumerInstance = consumerInstance;
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java
index b605264c..b17eb182 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java
@@ -3,6 +3,7 @@
* slice-analysis-ms
* ================================================================================
* Copyright (C) 2020 Wipro Limited.
+ * Copyright (C) 2022 CTC, Inc.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,10 +22,14 @@
package org.onap.slice.analysis.ms.dmaap;
-import com.att.nsa.cambria.client.CambriaConsumer;
-
+import com.google.gson.JsonElement;
+import io.vavr.collection.List;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
/**
* Consume Notifications from DMAAP events
@@ -32,15 +37,17 @@ import org.slf4j.LoggerFactory;
public class NotificationConsumer implements Runnable {
private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class);
- private CambriaConsumer cambriaConsumer;
private NotificationCallback notificationCallback;
+ private MessageRouterSubscriber subscriber;
+ private MessageRouterSubscribeRequest request;
/**
* Parameterized Constructor.
*/
- public NotificationConsumer(CambriaConsumer cambriaConsumer, NotificationCallback notificationCallback) {
+ public NotificationConsumer(MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest request, NotificationCallback notificationCallback) {
super();
- this.cambriaConsumer = cambriaConsumer;
+ this.subscriber = subscriber;
+ this.request = request;
this.notificationCallback = notificationCallback;
}
@@ -50,9 +57,14 @@ public class NotificationConsumer implements Runnable {
@Override
public void run() {
try {
- Iterable<String> msgs = cambriaConsumer.fetch();
- for (String msg : msgs) {
- log.debug(msg);
+ Mono<MessageRouterSubscribeResponse> responses = this.subscriber.get(this.request);
+
+ MessageRouterSubscribeResponse resp = responses.block();
+ log.debug(resp.toString());
+
+ List<JsonElement> list = resp.items();
+ for(int i=0; i<list.size(); i++){
+ String msg = list.get(i).toString();
notificationCallback.activateCallBack(msg);
}
} catch (Exception e) {
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java
index ce362019..55f24a65 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationProducer.java
@@ -3,6 +3,7 @@
* slice-analysis-ms
* ================================================================================
* Copyright (C) 2020-2021 Wipro Limited.
+ * Copyright (C) 2022 CTC, Inc.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +22,11 @@
package org.onap.slice.analysis.ms.dmaap;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.google.gson.JsonPrimitive;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import reactor.core.publisher.Flux;
import java.io.IOException;
@@ -30,21 +35,25 @@ import java.io.IOException;
*/
public class NotificationProducer {
- private CambriaBatchingPublisher cambriaBatchingPublisher;
+ private MessageRouterPublisher publisher;
+ private MessageRouterPublishRequest request;
/**
* Parameterized constructor.
*/
- public NotificationProducer(CambriaBatchingPublisher cambriaBatchingPublisher) {
+ public NotificationProducer(MessageRouterPublisher publisher, MessageRouterPublishRequest request) {
super();
- this.cambriaBatchingPublisher = cambriaBatchingPublisher;
+ this.publisher = publisher;
+ this.request = request;
}
/**
* sends notification to dmaap.
*/
- public int sendNotification(String msg) throws IOException {
- return cambriaBatchingPublisher.send("", msg);
+ public void sendNotification(String msg) throws IOException {
+ Flux<JsonPrimitive> singleMessage = Flux.just(msg).map(JsonPrimitive::new);
+ Flux<MessageRouterPublishResponse> result = this.publisher.put(request, singleMessage);
+ result.then().block();
}
}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java
index 06604040..04ee151f 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java
@@ -3,6 +3,7 @@
* slice-analysis-ms
* ================================================================================
* Copyright (C) 2020 Wipro Limited.
+ * Copyright (C) 2022 CTC, Inc.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,24 +22,22 @@
package org.onap.slice.analysis.ms.dmaap;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import java.io.IOException;
import java.util.Map;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
import org.onap.slice.analysis.ms.models.Configuration;
-import org.onap.slice.analysis.ms.utils.DmaapUtils;
+import org.onap.slice.analysis.ms.utils.DcaeDmaapUtil;
/**
* Client class to handle Policy interactions
*/
public class PolicyDmaapClient {
- private DmaapUtils dmaapUtils;
-
private Configuration configuration;
- public PolicyDmaapClient(DmaapUtils dmaapUtils, Configuration configuration) {
- this.dmaapUtils = dmaapUtils;
+ public PolicyDmaapClient(Configuration configuration) {
this.configuration = configuration;
}
@@ -50,14 +49,11 @@ public class PolicyDmaapClient {
Map<String, Object> streamsPublishes = configuration.getStreamsPublishes();
String policyTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamsPublishes.get("CL_topic"))
.get("dmaap_info")).get("topic_url");
- String[] policyTopicSplit = policyTopicUrl.split("\\/");
- String policyTopic = policyTopicSplit[policyTopicSplit.length - 1];
- CambriaBatchingPublisher cambriaBatchingPublisher;
try {
+ MessageRouterPublisher publisher = DcaeDmaapUtil.buildPublisher();
+ MessageRouterPublishRequest request = DcaeDmaapUtil.buildPublisherRequest("CL_topic", policyTopicUrl);
- cambriaBatchingPublisher = dmaapUtils.buildPublisher(configuration, policyTopic);
-
- NotificationProducer notificationProducer = new NotificationProducer(cambriaBatchingPublisher);
+ NotificationProducer notificationProducer = new NotificationProducer(publisher, request);
notificationProducer.sendNotification(msg);
} catch (IOException e) {
return false;
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java
index 729fdc5a..24aeea61 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java
@@ -4,6 +4,7 @@
* ================================================================================
* Copyright (C) 2020-2021 Wipro Limited.
* Copyright (C) 2022 Huawei Canada Limited.
+ * Copyright (C) 2022 CTC, Inc.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -39,7 +40,6 @@ import org.onap.slice.analysis.ms.models.policy.Payload;
import org.onap.slice.analysis.ms.models.policy.Sla;
import org.onap.slice.analysis.ms.models.policy.TransportNetwork;
import org.onap.slice.analysis.ms.service.ccvpn.RequestOwner;
-import org.onap.slice.analysis.ms.utils.DmaapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -61,7 +61,7 @@ public class PolicyService {
@PostConstruct
public void init() {
Configuration configuration = Configuration.getInstance();
- policyDmaapClient = new PolicyDmaapClient(new DmaapUtils(), configuration);
+ policyDmaapClient = new PolicyDmaapClient(configuration);
}
protected <T> OnsetMessage formPolicyOnsetMessage(String snssai, AdditionalProperties<T> addProps, Map<String, String> serviceDetails) {
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DcaeDmaapUtil.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DcaeDmaapUtil.java
new file mode 100644
index 00000000..9a5bf711
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DcaeDmaapUtil.java
@@ -0,0 +1,89 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2022 CTC, Inc.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.slice.analysis.ms.utils;
+
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.onap.slice.analysis.ms.models.Configuration;
+
+public class DcaeDmaapUtil {
+ public static MessageRouterSubscriber buildSubscriber(){
+ MessageRouterSubscriberConfig connectionPoolConfiguration = ImmutableMessageRouterSubscriberConfig.builder()
+ .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
+ .connectionPool(16)
+ .maxIdleTime(10) //in seconds
+ .maxLifeTime(20) //in seconds
+ .build())
+ .build();
+
+ MessageRouterSubscriber cut = DmaapClientFactory.createMessageRouterSubscriber(connectionPoolConfiguration);
+ return cut;
+ }
+
+ public static MessageRouterSubscribeRequest buildSubscriberRequest(String name, String topicUrl){
+ MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
+ .name(name)
+ .topicUrl(topicUrl)
+ .build();
+ MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder()
+ .consumerGroup("1")
+ .consumerId("1")
+ .sourceDefinition(sourceDefinition)
+ .build();
+
+ return request;
+ }
+
+ public static MessageRouterPublisher buildPublisher(){
+ MessageRouterPublisher pub = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ return pub;
+ }
+
+ public static MessageRouterPublishRequest buildPublisherRequest(String name, String topicUrl){
+ MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+ .name(name)
+ .topicUrl(topicUrl)
+ .build();
+ MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition)
+ .contentType(ContentType.TEXT_PLAIN)
+ .build();
+ return request;
+ }
+
+
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DmaapUtils.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DmaapUtils.java
deleted file mode 100644
index 6e1cf912..00000000
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/utils/DmaapUtils.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * slice-analysis-ms
- * ================================================================================
- * Copyright (C) 2020-2021 Wipro Limited.
- * ==============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- *******************************************************************************/
-
-package org.onap.slice.analysis.ms.utils;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClient;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.att.nsa.cambria.client.CambriaTopicManager;
-
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-
-import org.onap.slice.analysis.ms.models.Configuration;
-
-/**
- * Utility class to perform actions related to Dmaap
- */
-public class DmaapUtils {
-
- /**
- * Build publisher.
- */
- public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) {
- try {
- return builder(config, topic).build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- return null;
-
- }
- }
-
- /**
- * Build consumer.
- */
- public CambriaConsumer buildConsumer(Configuration config, String topic) {
-
- try {
- return builderConsumer(config, topic).build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- return null;
- }
-
- }
-
- private static PublisherBuilder builder(Configuration config, String topic) {
- if (config.isSecured()) {
- return authenticatedBuilder(config, topic);
- } else {
- return unAuthenticatedBuilder(config, topic);
- }
- }
-
- private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) {
- return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
- config.getAafPassword());
- }
-
- private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) {
- return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
- .logSendFailuresAfter(5);
- }
-
- private static ConsumerBuilder builderConsumer(Configuration config, String topic) {
- if (config.isSecured()) {
- return authenticatedConsumerBuilder(config, topic);
- } else {
- return unAuthenticatedConsumerBuilder(config, topic);
- }
- }
-
- private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) {
- return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
- .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000);
- }
-
- private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) {
- return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
- config.getAafPassword());
- }
-
- /**
- * Build cambriaClient.
- */
- public CambriaTopicManager cambriaCLientBuilder(Configuration configuration) {
- if (configuration.isSecured()) {
- return authenticatedCambriaCLientBuilder(configuration);
- } else {
- return unAuthenticatedCambriaCLientBuilder(configuration);
-
- }
- }
-
- private static CambriaTopicManager authenticatedCambriaCLientBuilder(Configuration config) {
- try {
- return buildCambriaClient(new TopicManagerBuilder().usingHosts(config.getDmaapServers())
- .authenticatedByHttp(config.getAafUsername(), config.getAafPassword()));
- } catch (MalformedURLException | GeneralSecurityException e) {
- return null;
- }
- }
-
- private static CambriaTopicManager unAuthenticatedCambriaCLientBuilder(Configuration config) {
- try {
- return buildCambriaClient(new TopicManagerBuilder().usingHosts(config.getDmaapServers()));
- } catch (MalformedURLException | GeneralSecurityException e) {
- return null;
-
- }
- }
-
- @SuppressWarnings("unchecked")
- private static <T extends CambriaClient> T buildCambriaClient(
- CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client)
- throws MalformedURLException, GeneralSecurityException {
- return (T) client.build();
- }
-
-}