summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java236
1 files changed, 236 insertions, 0 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java
new file mode 100644
index 00000000..f3330dc3
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/MRTopicMonitor.java
@@ -0,0 +1,236 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * Copyright (C) 2022 CTC, Inc.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.slice.analysis.ms.dmaap;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
+import lombok.Getter;
+import lombok.NonNull;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.slice.analysis.ms.models.Configuration;
+
+import org.onap.slice.analysis.ms.utils.DcaeDmaapUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is a Dmaap message-router topic monitor.
+ * It takes advantage of AT&T's dmaap-client's long-polling implementation, this monitor constantly fetch/refetch target msg topic.
+ * So that new msg can be notified almost immediately. This is the major different from previous implementation.
+ */
+public class MRTopicMonitor implements Runnable {
+
+ private final String name;
+ private volatile boolean running = false;
+ private Configuration configuration;
+ private static Logger logger = LoggerFactory.getLogger(MRTopicMonitor.class);
+ private static int DEFAULT_TIMEOUT_MS_FETCH = 15000;
+ private MRConsumerWrapper consumerWrapper;
+ private NotificationCallback callback;
+
+ /**
+ * Constructor
+ * @param name name of topic subscriber in config
+ * @param callback callbackfunction for received message
+ */
+ public MRTopicMonitor(String name, NotificationCallback callback){
+ this.name = name;
+ this.callback = callback;
+ }
+
+ /**
+ * Start the monitoring thread
+ */
+ public void start(){
+ logger.info("Starting Dmaap Bus Monitor");
+ configuration = Configuration.getInstance();
+
+ Map<String, Object> streamSubscribes = configuration.getStreamsSubscribes();
+ Map<String, Object> topicParamsJson = (Map<String, Object>) streamSubscribes.get(name);
+ JsonObject jsonObject = (new Gson()).toJsonTree(topicParamsJson).getAsJsonObject();
+ consumerWrapper = buildConsumerWrapper(jsonObject);
+ running = true;
+ Executor executor = Executors.newSingleThreadExecutor();
+ executor.execute(this);
+ }
+
+ /**
+ * Main loop that keep fetching and processing
+ */
+ @Override
+ public void run(){
+ while (running){
+ try {
+ logger.debug("Topic: {} getting new msg...", name);
+ List<JsonElement> dmaapMsgs = consumerWrapper.fetch();
+ for (JsonElement msg : dmaapMsgs){
+ logger.debug("Received message: {}" +
+ "\r\n and processing start", msg);
+ process(msg.toString());
+ }
+ } catch (IOException | RuntimeException e){
+ logger.error("fetchMessage encountered error: {}", e);
+ }
+ }
+ logger.info("{}: exiting thread", this);
+ }
+
+ /**
+ * Stop the monitor
+ */
+ public void stop(){
+ logger.info("{}: exiting", this);
+ running = false;
+ this.consumerWrapper.close();
+ this.consumerWrapper = null;
+ }
+
+ private void process(String msg) {
+ try {
+ callback.activateCallBack(msg);
+ } catch (Exception e){
+ logger.error("process message encountered error: {}", e);
+ }
+ }
+
+ private List<JsonElement> fetch() throws IOException {
+ return this.consumerWrapper.fetch();
+ }
+
+ private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson )
+ throws IllegalArgumentException {
+ MRTopicParams topicParams = MRTopicParams.builder().buildFromConfigJson(topicParamsJson).build();
+ return new MRConsumerWrapper(topicParams);
+ }
+
+ /**
+ * Wrapper class of DmaapClient (package org.onap.dmaap.mr.client)
+ * A polling fashion dmaap consumer, whose #fetch() sleep a certain time when connection fails, otherwise keep retryiny.
+ * It supports both https and http protocols.
+ */
+ private class MRConsumerWrapper {
+ /**
+ * Name of the "protocol" property.
+ */
+ protected static final String PROTOCOL_PROP = "Protocol";
+ /**
+ * Fetch timeout.
+ */
+ protected int fetchTimeout;
+
+ /**
+ * Time to sleep on a fetch failure.
+ */
+ @Getter
+ private final int sleepTime;
+
+ /**
+ * Counted down when {@link #close()} is invoked.
+ */
+ private final CountDownLatch closeCondition = new CountDownLatch(1);
+
+ protected MessageRouterSubscriber subscriber;
+ protected MessageRouterSubscribeRequest request;
+
+ /**
+ * Constructs the object.
+ *
+ * @param MRTopicParams parameters for the bus topic
+ */
+ protected MRConsumerWrapper(MRTopicParams MRTopicParams) {
+ this.fetchTimeout = MRTopicParams.getFetchTimeout();
+
+ if (this.fetchTimeout <= 0) {
+ this.sleepTime = DEFAULT_TIMEOUT_MS_FETCH;
+ } else {
+ // don't sleep too long, even if fetch timeout is large
+ this.sleepTime = Math.min(this.fetchTimeout, DEFAULT_TIMEOUT_MS_FETCH);
+ }
+
+ if (MRTopicParams.isTopicInvalid()) {
+ throw new IllegalArgumentException("No topic for DMaaP");
+ }
+
+ if (MRTopicParams.isServersInvalid()) {
+ throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
+ }
+
+ try{
+ this.subscriber = DcaeDmaapUtil.buildSubscriber();
+ this.request = DcaeDmaapUtil.buildSubscriberRequest("aai_subscriber", MRTopicParams.getTopic());
+
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Illegal MrConsumer parameters");
+ }
+
+ }
+
+ /**
+ * Try fetch new message. But backoff for some sleepTime when connection fails.
+ * @return
+ * @throws IOException
+ */
+ public List<JsonElement> fetch() throws IOException {
+ Mono<MessageRouterSubscribeResponse> responses = this.subscriber.get(this.request);
+ MessageRouterSubscribeResponse resp = responses.block();
+ List<JsonElement> list = resp.items();
+ return list;
+
+ }
+
+ /**
+ * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
+ * or the thread is interrupted, then this will return immediately.
+ */
+ protected void sleepAfterFetchFailure() {
+ try {
+ logger.info("{}: backoff for {}ms", this, sleepTime);
+ if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
+ logger.info("{}: closed while handling fetch error", this);
+ }
+
+ } catch (InterruptedException e) {
+ logger.warn("{}: interrupted while handling fetch error", this, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Close the dmaap client and this thread
+ */
+ public void close() {
+ this.closeCondition.countDown();
+ }
+ }
+}