summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java28
1 files changed, 20 insertions, 8 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java
index b605264c..b17eb182 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java
@@ -3,6 +3,7 @@
* slice-analysis-ms
* ================================================================================
* Copyright (C) 2020 Wipro Limited.
+ * Copyright (C) 2022 CTC, Inc.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,10 +22,14 @@
package org.onap.slice.analysis.ms.dmaap;
-import com.att.nsa.cambria.client.CambriaConsumer;
-
+import com.google.gson.JsonElement;
+import io.vavr.collection.List;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
/**
* Consume Notifications from DMAAP events
@@ -32,15 +37,17 @@ import org.slf4j.LoggerFactory;
public class NotificationConsumer implements Runnable {
private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class);
- private CambriaConsumer cambriaConsumer;
private NotificationCallback notificationCallback;
+ private MessageRouterSubscriber subscriber;
+ private MessageRouterSubscribeRequest request;
/**
* Parameterized Constructor.
*/
- public NotificationConsumer(CambriaConsumer cambriaConsumer, NotificationCallback notificationCallback) {
+ public NotificationConsumer(MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest request, NotificationCallback notificationCallback) {
super();
- this.cambriaConsumer = cambriaConsumer;
+ this.subscriber = subscriber;
+ this.request = request;
this.notificationCallback = notificationCallback;
}
@@ -50,9 +57,14 @@ public class NotificationConsumer implements Runnable {
@Override
public void run() {
try {
- Iterable<String> msgs = cambriaConsumer.fetch();
- for (String msg : msgs) {
- log.debug(msg);
+ Mono<MessageRouterSubscribeResponse> responses = this.subscriber.get(this.request);
+
+ MessageRouterSubscribeResponse resp = responses.block();
+ log.debug(resp.toString());
+
+ List<JsonElement> list = resp.items();
+ for(int i=0; i<list.size(); i++){
+ String msg = list.get(i).toString();
notificationCallback.activateCallBack(msg);
}
} catch (Exception e) {