diff options
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java')
-rw-r--r-- | components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java | 96 |
1 files changed, 23 insertions, 73 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java index aa1bc964..f3330dc3 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java @@ -3,6 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2022 Huawei Canada Limited. + * Copyright (C) 2022 CTC, Inc. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,23 +22,23 @@ package org.onap.slice.analysis.ms.dmaap; import com.google.gson.Gson; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import io.vavr.collection.List; import lombok.Getter; import lombok.NonNull; -import org.onap.dmaap.mr.client.impl.MRConsumerImpl; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.onap.slice.analysis.ms.models.Configuration; -import org.onap.slice.analysis.ms.dmaap.MRTopicParams; +import org.onap.slice.analysis.ms.utils.DcaeDmaapUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; import java.io.IOException; -import java.net.MalformedURLException; -import java.util.ArrayList; import java.util.Map; -import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -92,11 +93,11 @@ public class MRTopicMonitor implements Runnable { while (running){ try { logger.debug("Topic: {} getting new msg...", name); - Iterable<String> dmaapMsgs = consumerWrapper.fetch(); - for (String msg : dmaapMsgs){ + List<JsonElement> dmaapMsgs = consumerWrapper.fetch(); + for (JsonElement msg : dmaapMsgs){ logger.debug("Received message: {}" + "\r\n and processing start", msg); - process(msg); + process(msg.toString()); } } catch (IOException | RuntimeException e){ logger.error("fetchMessage encountered error: {}", e); @@ -123,7 +124,7 @@ public class MRTopicMonitor implements Runnable { } } - private Iterable<String> fetch() throws IOException { + private List<JsonElement> fetch() throws IOException { return this.consumerWrapper.fetch(); } @@ -159,10 +160,8 @@ public class MRTopicMonitor implements Runnable { */ private final CountDownLatch closeCondition = new CountDownLatch(1); - /** - * MR Consumer. - */ - protected MRConsumerImpl consumer; + protected MessageRouterSubscriber subscriber; + protected MessageRouterSubscribeRequest request; /** * Constructs the object. @@ -188,42 +187,13 @@ public class MRTopicMonitor implements Runnable { } try{ - this.consumer = new MRConsumerImpl.MRConsumerImplBuilder() - .setHostPart(MRTopicParams.getServers()) - .setTopic(MRTopicParams.getTopic()) - .setConsumerGroup(MRTopicParams.getConsumerGroup()) - .setConsumerId(MRTopicParams.getConsumerInstance()) - .setTimeoutMs(MRTopicParams.getFetchTimeout()) - .setLimit(MRTopicParams.getFetchLimit()) - .setApiKey(MRTopicParams.getApiKey()) - .setApiSecret(MRTopicParams.getApiSecret()) - .createMRConsumerImpl(); - } catch (MalformedURLException e) { - throw new IllegalArgumentException("Illegal MrConsumer parameters"); - } - - - this.consumer.setUsername(MRTopicParams.getUserName()); - this.consumer.setPassword(MRTopicParams.getPassword()); - - if(MRTopicParams.isUserNameValid() && MRTopicParams.isPasswordValid()){ - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - } else { - this.consumer.setProtocolFlag(ProtocolTypeConstants.HTTPNOAUTH.getValue()); - } - - Properties props = new Properties(); - - if (MRTopicParams.isUseHttps()) { - props.setProperty(PROTOCOL_PROP, "https"); - this.consumer.setHost(MRTopicParams.getServers().get(0) + ":3905"); + this.subscriber = DcaeDmaapUtil.buildSubscriber(); + this.request = DcaeDmaapUtil.buildSubscriberRequest("aai_subscriber", MRTopicParams.getTopic()); - } else { - props.setProperty(PROTOCOL_PROP, "http"); - this.consumer.setHost(MRTopicParams.getServers().get(0) + ":3904"); + } catch (Exception e) { + throw new IllegalArgumentException("Illegal MrConsumer parameters"); } - this.consumer.setProps(props); } /** @@ -231,31 +201,12 @@ public class MRTopicMonitor implements Runnable { * @return * @throws IOException */ - public Iterable<String> fetch() throws IOException { - final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); - if (response == null) { - logger.warn("{}: DMaaP NULL response received", this); - - sleepAfterFetchFailure(); - return new ArrayList<>(); - } else { - logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), - response.getResponseMessage()); - - if (!"200".equals(response.getResponseCode())) { - - logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), - response.getResponseMessage()); + public List<JsonElement> fetch() throws IOException { + Mono<MessageRouterSubscribeResponse> responses = this.subscriber.get(this.request); + MessageRouterSubscribeResponse resp = responses.block(); + List<JsonElement> list = resp.items(); + return list; - sleepAfterFetchFailure(); - } - } - - if (response.getActualMessages() == null) { - return new ArrayList<>(); - } else { - return response.getActualMessages(); - } } /** @@ -280,7 +231,6 @@ public class MRTopicMonitor implements Runnable { */ public void close() { this.closeCondition.countDown(); - this.consumer.close(); } } } |