diff options
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java')
-rw-r--r-- | components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java | 28 |
1 files changed, 20 insertions, 8 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java index b605264c..b17eb182 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationConsumer.java @@ -3,6 +3,7 @@ * slice-analysis-ms * ================================================================================ * Copyright (C) 2020 Wipro Limited. + * Copyright (C) 2022 CTC, Inc. * ============================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,10 +22,14 @@ package org.onap.slice.analysis.ms.dmaap; -import com.att.nsa.cambria.client.CambriaConsumer; - +import com.google.gson.JsonElement; +import io.vavr.collection.List; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; /** * Consume Notifications from DMAAP events @@ -32,15 +37,17 @@ import org.slf4j.LoggerFactory; public class NotificationConsumer implements Runnable { private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class); - private CambriaConsumer cambriaConsumer; private NotificationCallback notificationCallback; + private MessageRouterSubscriber subscriber; + private MessageRouterSubscribeRequest request; /** * Parameterized Constructor. */ - public NotificationConsumer(CambriaConsumer cambriaConsumer, NotificationCallback notificationCallback) { + public NotificationConsumer(MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest request, NotificationCallback notificationCallback) { super(); - this.cambriaConsumer = cambriaConsumer; + this.subscriber = subscriber; + this.request = request; this.notificationCallback = notificationCallback; } @@ -50,9 +57,14 @@ public class NotificationConsumer implements Runnable { @Override public void run() { try { - Iterable<String> msgs = cambriaConsumer.fetch(); - for (String msg : msgs) { - log.debug(msg); + Mono<MessageRouterSubscribeResponse> responses = this.subscriber.get(this.request); + + MessageRouterSubscribeResponse resp = responses.block(); + log.debug(resp.toString()); + + List<JsonElement> list = resp.items(); + for(int i=0; i<list.size(); i++){ + String msg = list.get(i).toString(); notificationCallback.activateCallBack(msg); } } catch (Exception e) { |