summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java96
1 files changed, 23 insertions, 73 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java
index aa1bc964..f3330dc3 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java
@@ -3,6 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2022 Huawei Canada Limited.
+ * Copyright (C) 2022 CTC, Inc.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,23 +22,23 @@
package org.onap.slice.analysis.ms.dmaap;
import com.google.gson.Gson;
+import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import io.vavr.collection.List;
import lombok.Getter;
import lombok.NonNull;
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.onap.slice.analysis.ms.models.Configuration;
-import org.onap.slice.analysis.ms.dmaap.MRTopicParams;
+import org.onap.slice.analysis.ms.utils.DcaeDmaapUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
import java.io.IOException;
-import java.net.MalformedURLException;
-import java.util.ArrayList;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -92,11 +93,11 @@ public class MRTopicMonitor implements Runnable {
while (running){
try {
logger.debug("Topic: {} getting new msg...", name);
- Iterable<String> dmaapMsgs = consumerWrapper.fetch();
- for (String msg : dmaapMsgs){
+ List<JsonElement> dmaapMsgs = consumerWrapper.fetch();
+ for (JsonElement msg : dmaapMsgs){
logger.debug("Received message: {}" +
"\r\n and processing start", msg);
- process(msg);
+ process(msg.toString());
}
} catch (IOException | RuntimeException e){
logger.error("fetchMessage encountered error: {}", e);
@@ -123,7 +124,7 @@ public class MRTopicMonitor implements Runnable {
}
}
- private Iterable<String> fetch() throws IOException {
+ private List<JsonElement> fetch() throws IOException {
return this.consumerWrapper.fetch();
}
@@ -159,10 +160,8 @@ public class MRTopicMonitor implements Runnable {
*/
private final CountDownLatch closeCondition = new CountDownLatch(1);
- /**
- * MR Consumer.
- */
- protected MRConsumerImpl consumer;
+ protected MessageRouterSubscriber subscriber;
+ protected MessageRouterSubscribeRequest request;
/**
* Constructs the object.
@@ -188,42 +187,13 @@ public class MRTopicMonitor implements Runnable {
}
try{
- this.consumer = new MRConsumerImpl.MRConsumerImplBuilder()
- .setHostPart(MRTopicParams.getServers())
- .setTopic(MRTopicParams.getTopic())
- .setConsumerGroup(MRTopicParams.getConsumerGroup())
- .setConsumerId(MRTopicParams.getConsumerInstance())
- .setTimeoutMs(MRTopicParams.getFetchTimeout())
- .setLimit(MRTopicParams.getFetchLimit())
- .setApiKey(MRTopicParams.getApiKey())
- .setApiSecret(MRTopicParams.getApiSecret())
- .createMRConsumerImpl();
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("Illegal MrConsumer parameters");
- }
-
-
- this.consumer.setUsername(MRTopicParams.getUserName());
- this.consumer.setPassword(MRTopicParams.getPassword());
-
- if(MRTopicParams.isUserNameValid() && MRTopicParams.isPasswordValid()){
- this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
- } else {
- this.consumer.setProtocolFlag(ProtocolTypeConstants.HTTPNOAUTH.getValue());
- }
-
- Properties props = new Properties();
-
- if (MRTopicParams.isUseHttps()) {
- props.setProperty(PROTOCOL_PROP, "https");
- this.consumer.setHost(MRTopicParams.getServers().get(0) + ":3905");
+ this.subscriber = DcaeDmaapUtil.buildSubscriber();
+ this.request = DcaeDmaapUtil.buildSubscriberRequest("aai_subscriber", MRTopicParams.getTopic());
- } else {
- props.setProperty(PROTOCOL_PROP, "http");
- this.consumer.setHost(MRTopicParams.getServers().get(0) + ":3904");
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Illegal MrConsumer parameters");
}
- this.consumer.setProps(props);
}
/**
@@ -231,31 +201,12 @@ public class MRTopicMonitor implements Runnable {
* @return
* @throws IOException
*/
- public Iterable<String> fetch() throws IOException {
- final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
- if (response == null) {
- logger.warn("{}: DMaaP NULL response received", this);
-
- sleepAfterFetchFailure();
- return new ArrayList<>();
- } else {
- logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
- response.getResponseMessage());
-
- if (!"200".equals(response.getResponseCode())) {
-
- logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
- response.getResponseMessage());
+ public List<JsonElement> fetch() throws IOException {
+ Mono<MessageRouterSubscribeResponse> responses = this.subscriber.get(this.request);
+ MessageRouterSubscribeResponse resp = responses.block();
+ List<JsonElement> list = resp.items();
+ return list;
- sleepAfterFetchFailure();
- }
- }
-
- if (response.getActualMessages() == null) {
- return new ArrayList<>();
- } else {
- return response.getActualMessages();
- }
}
/**
@@ -280,7 +231,6 @@ public class MRTopicMonitor implements Runnable {
*/
public void close() {
this.closeCondition.countDown();
- this.consumer.close();
}
}
}