From 107be6ba14cd4f6822c2e2e058a9f0880bd9e3c9 Mon Sep 17 00:00:00 2001 From: ChuanyuChen Date: Mon, 10 Apr 2023 10:37:47 +0800 Subject: Support Dmaap Message Util Support Dmaap Message Util Issue-ID: USECASEUI-794 Signed-off-by: ChuanyuChen Change-Id: If54a17f72370667444fc87129d8fdc0958be8692 --- intentanalysis/pom.xml | 24 ++ .../adapters/dmaap/MRTopicMonitor.java | 240 +++++++++++++ .../adapters/dmaap/MRTopicParams.java | 375 +++++++++++++++++++++ .../adapters/dmaap/NotificationCallback.java | 26 ++ .../usecaseui/intentanalysis/util/DmaapUtil.java | 80 +++++ .../resources/dmaapConfig/dcae_dmaap_config.json | 20 ++ .../resources/dmaapConfig/policy_dmaap_config.json | 20 ++ 7 files changed, 785 insertions(+) create mode 100644 intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java create mode 100644 intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java create mode 100644 intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationCallback.java create mode 100644 intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java create mode 100644 intentanalysis/src/main/resources/dmaapConfig/dcae_dmaap_config.json create mode 100644 intentanalysis/src/main/resources/dmaapConfig/policy_dmaap_config.json diff --git a/intentanalysis/pom.xml b/intentanalysis/pom.xml index ac21f4d..84f93aa 100644 --- a/intentanalysis/pom.xml +++ b/intentanalysis/pom.xml @@ -212,6 +212,30 @@ commons-io 2.7 + + org.onap.dmaap.messagerouter.dmaapclient + dmaapClient + 1.1.12 + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + apache-log4j-extras + log4j + + + + + org.onap.dcaegen2.services.sdk.rest.services + dmaap-client + 1.8.7 + diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java new file mode 100644 index 0000000..4712d0c --- /dev/null +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java @@ -0,0 +1,240 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2022 Huawei Canada Limited. + * Copyright (C) 2022 CTC, Inc. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.usecaseui.intentanalysis.adapters.dmaap; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.vavr.collection.List; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import lombok.NonNull; +import org.apache.commons.io.FileUtils; +import org.apache.ibatis.io.Resources; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.onap.usecaseui.intentanalysis.util.DmaapUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +/** + * This is a Dmaap message-router topic monitor. + * It takes advantage of AT&T's dmaap-client's long-polling implementation, this monitor constantly fetch/refetch target msg topic. + * So that new msg can be notified almost immediately. This is the major different from previous implementation. + */ +public class MRTopicMonitor implements Runnable { + + private final String name; + private volatile boolean running = false; + private static Logger logger = LoggerFactory.getLogger(MRTopicMonitor.class); + private static int DEFAULT_TIMEOUT_MS_FETCH = 15000; + private MRConsumerWrapper consumerWrapper; + private NotificationCallback callback; + + /** + * Constructor + * @param name name of topic subscriber in config + * @param callback callbackfunction for received message + */ + public MRTopicMonitor(String name, NotificationCallback callback){ + this.name = name; + this.callback = callback; + } + + /** + * Start the monitoring thread + */ + public void start(){ + logger.info("Starting Dmaap Bus Monitor"); + try { + File configFile = Resources.getResourceAsFile("intentPolicy/modifycll.json"); + String configBody = FileUtils.readFileToString(configFile, StandardCharsets.UTF_8); + JsonObject jsonObject = JsonParser.parseString(configBody).getAsJsonObject(); + consumerWrapper = buildConsumerWrapper(jsonObject); + running = true; + Executor executor = Executors.newSingleThreadExecutor(); + executor.execute(this); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + /** + * Main loop that keep fetching and processing + */ + @Override + public void run(){ + while (running){ + try { + logger.debug("Topic: {} getting new msg...", name); + List dmaapMsgs = consumerWrapper.fetch(); + for (JsonElement msg : dmaapMsgs){ + logger.debug("Received message: {}" + + "\r\n and processing start", msg); + process(msg.toString()); + } + } catch (IOException | RuntimeException e){ + logger.error("fetchMessage encountered error: {}", e); + } + } + logger.info("{}: exiting thread", this); + } + + /** + * Stop the monitor + */ + public void stop(){ + logger.info("{}: exiting", this); + running = false; + this.consumerWrapper.close(); + this.consumerWrapper = null; + } + + private void process(String msg) { + try { + callback.activateCallBack(msg); + } catch (Exception e){ + logger.error("process message encountered error: {}", e); + } + } + + private List fetch() throws IOException { + return this.consumerWrapper.fetch(); + } + + private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson ) + throws IllegalArgumentException { + MRTopicParams topicParams = MRTopicParams.builder().buildFromConfigJson(topicParamsJson).build(); + return new MRConsumerWrapper(topicParams); + } + + /** + * Wrapper class of DmaapClient (package org.onap.dmaap.mr.client) + * A polling fashion dmaap consumer, whose #fetch() sleep a certain time when connection fails, otherwise keep retryiny. + * It supports both https and http protocols. + */ + private class MRConsumerWrapper { + /** + * Name of the "protocol" property. + */ + protected static final String PROTOCOL_PROP = "Protocol"; + /** + * Fetch timeout. + */ + protected int fetchTimeout; + + /** + * Time to sleep on a fetch failure. + */ + @Getter + private final int sleepTime; + + /** + * Counted down when {@link #close()} is invoked. + */ + private final CountDownLatch closeCondition = new CountDownLatch(1); + + protected MessageRouterSubscriber subscriber; + protected MessageRouterSubscribeRequest request; + + /** + * Constructs the object. + * + * @param MRTopicParams parameters for the bus topic + */ + protected MRConsumerWrapper(MRTopicParams MRTopicParams) { + this.fetchTimeout = MRTopicParams.getFetchTimeout(); + + if (this.fetchTimeout <= 0) { + this.sleepTime = DEFAULT_TIMEOUT_MS_FETCH; + } else { + // don't sleep too long, even if fetch timeout is large + this.sleepTime = Math.min(this.fetchTimeout, DEFAULT_TIMEOUT_MS_FETCH); + } + + if (MRTopicParams.isTopicInvalid()) { + throw new IllegalArgumentException("No topic for DMaaP"); + } + + if (MRTopicParams.isServersInvalid()) { + throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); + } + + try{ + this.subscriber = DmaapUtil.buildSubscriber(); + this.request = DmaapUtil.buildSubscriberRequest("aai_subscriber", MRTopicParams.getTopic()); + + } catch (Exception e) { + throw new IllegalArgumentException("Illegal MrConsumer parameters"); + } + + } + + /** + * Try fetch new message. But backoff for some sleepTime when connection fails. + * @return + * @throws IOException + */ + public List fetch() throws IOException { + Mono responses = this.subscriber.get(this.request); + MessageRouterSubscribeResponse resp = responses.block(); + List list = resp.items(); + return list; + + } + + /** + * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed, + * or the thread is interrupted, then this will return immediately. + */ + protected void sleepAfterFetchFailure() { + try { + logger.info("{}: backoff for {}ms", this, sleepTime); + if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) { + logger.info("{}: closed while handling fetch error", this); + } + + } catch (InterruptedException e) { + logger.warn("{}: interrupted while handling fetch error", this, e); + Thread.currentThread().interrupt(); + } + } + + /** + * Close the dmaap client and this thread + */ + public void close() { + this.closeCondition.countDown(); + } + } +} diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java new file mode 100644 index 0000000..7e2b3b6 --- /dev/null +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java @@ -0,0 +1,375 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2022 Huawei Canada Limited. + * Copyright (C) 2022 CTC, Inc. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.usecaseui.intentanalysis.adapters.dmaap; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.apache.commons.lang3.StringUtils; + +/** + * Partially copied from Onap Policy + * policy/common/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java + * Modified to fit this project. + * Member variables of this Params class are as follows. + * + *

servers DMaaP servers + * topic DMaaP Topic to be monitored + * apiKey DMaaP API Key (optional) + * apiSecret DMaaP API Secret (optional) + * consumerGroup DMaaP Reader Consumer Group + * consumerInstance DMaaP Reader Instance + * fetchTimeout DMaaP fetch timeout + * fetchLimit DMaaP fetch limit + * environment DME2 Environment + * aftEnvironment DME2 AFT Environment + * partner DME2 Partner + * latitude DME2 Latitude + * longitude DME2 Longitude + * additionalProps Additional properties to pass to DME2 + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow + */ +@Getter +@Setter +public class MRTopicParams { + + private int port; + private List servers; + private Map additionalProps; + private String topic; + private String effectiveTopic; + private String apiKey; + private String apiSecret; + private String consumerGroup; + private String consumerInstance; + private int fetchTimeout; + private int fetchLimit; + private boolean useHttps; + private boolean allowSelfSignedCerts; + private boolean managed; + + private String userName; + private String password; + private String environment; + private String aftEnvironment; + private String partner; + private String latitude; + private String longitude; + private String partitionId; + private String clientName; + private String hostname; + private String basePath; + @Getter + private String serializationProvider; + + public static TopicParamsBuilder builder() { + return new TopicParamsBuilder(); + } + + /** + * Methods to Check if the property is INVALID. + */ + + public boolean isEnvironmentInvalid() { + return StringUtils.isBlank(environment); + } + + public boolean isAftEnvironmentInvalid() { + return StringUtils.isBlank(aftEnvironment); + } + + public boolean isLatitudeInvalid() { + return StringUtils.isBlank(latitude); + } + + public boolean isLongitudeInvalid() { + return StringUtils.isBlank(longitude); + } + + public boolean isConsumerInstanceInvalid() { + return StringUtils.isBlank(consumerInstance); + } + + public boolean isConsumerGroupInvalid() { + return StringUtils.isBlank(consumerGroup); + } + + public boolean isClientNameInvalid() { + return StringUtils.isBlank(clientName); + } + + public boolean isPartnerInvalid() { + return StringUtils.isBlank(partner); + } + + public boolean isServersInvalid() { + return (servers == null || servers.isEmpty() + || (servers.size() == 1 && ("".equals(servers.get(0))))); + } + + public boolean isTopicInvalid() { + return StringUtils.isBlank(topic); + } + + public boolean isPartitionIdInvalid() { + return StringUtils.isBlank(partitionId); + } + + public boolean isHostnameInvalid() { + return StringUtils.isBlank(hostname); + } + + public boolean isPortInvalid() { + return (port <= 0 || port >= 65535); + } + + /** + * Methods to Check if the property is Valid. + */ + + public boolean isApiKeyValid() { + return StringUtils.isNotBlank(apiKey); + } + + public boolean isApiSecretValid() { + return StringUtils.isNotBlank(apiSecret); + } + + public boolean isUserNameValid() { + return StringUtils.isNotBlank(userName); + } + + public boolean isPasswordValid() { + return StringUtils.isNotBlank(password); + } + + public boolean isAdditionalPropsValid() { + return additionalProps != null; + } + + @NoArgsConstructor(access = AccessLevel.PRIVATE) + public static class TopicParamsBuilder { + + final MRTopicParams params = new MRTopicParams(); + + public TopicParamsBuilder servers(List servers) { + this.params.servers = servers; + return this; + } + + public TopicParamsBuilder topic(String topic) { + this.params.topic = topic; + return this; + } + + public TopicParamsBuilder effectiveTopic(String effectiveTopic) { + this.params.effectiveTopic = effectiveTopic; + return this; + } + + public TopicParamsBuilder apiKey(String apiKey) { + this.params.apiKey = apiKey; + return this; + } + + public TopicParamsBuilder apiSecret(String apiSecret) { + this.params.apiSecret = apiSecret; + return this; + } + + public TopicParamsBuilder consumerGroup(String consumerGroup) { + this.params.consumerGroup = consumerGroup; + return this; + } + + public TopicParamsBuilder consumerInstance(String consumerInstance) { + this.params.consumerInstance = consumerInstance; + return this; + } + + public TopicParamsBuilder fetchTimeout(int fetchTimeout) { + this.params.fetchTimeout = fetchTimeout; + return this; + } + + public TopicParamsBuilder fetchLimit(int fetchLimit) { + this.params.fetchLimit = fetchLimit; + return this; + } + + public TopicParamsBuilder useHttps(boolean useHttps) { + this.params.useHttps = useHttps; + return this; + } + + public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) { + this.params.allowSelfSignedCerts = allowSelfSignedCerts; + return this; + } + + public TopicParamsBuilder userName(String userName) { + this.params.userName = userName; + return this; + } + + public TopicParamsBuilder password(String password) { + this.params.password = password; + return this; + } + + public TopicParamsBuilder environment(String environment) { + this.params.environment = environment; + return this; + } + + public TopicParamsBuilder aftEnvironment(String aftEnvironment) { + this.params.aftEnvironment = aftEnvironment; + return this; + } + + public TopicParamsBuilder partner(String partner) { + this.params.partner = partner; + return this; + } + + public TopicParamsBuilder latitude(String latitude) { + this.params.latitude = latitude; + return this; + } + + public TopicParamsBuilder longitude(String longitude) { + this.params.longitude = longitude; + return this; + } + + public TopicParamsBuilder additionalProps(Map additionalProps) { + this.params.additionalProps = additionalProps; + return this; + } + + public TopicParamsBuilder partitionId(String partitionId) { + this.params.partitionId = partitionId; + return this; + } + + public MRTopicParams build() { + return params; + } + + public TopicParamsBuilder buildFromConfigJson(JsonObject jsonObject) { + String consumerGroup = null; + String consumerInstance = null; + String aafUsername = null; + String aafPassword = null; + List servers = new ArrayList<>(); + String topic = null; + boolean useHttps = false; + int fetchTimeout = -1; + int fetchLimit = -1; + + if (jsonObject.has("consumer_group") && !jsonObject.get("consumer_group").isJsonNull()) { + consumerGroup = jsonObject.get("consumer_group").getAsString(); + } + if (jsonObject.has("consumer_instance") && !jsonObject.get("consumer_instance").isJsonNull()) { + consumerInstance = jsonObject.get("consumer_instance").getAsString(); + } + if (jsonObject.has("aaf_username") && !jsonObject.get("aaf_username").isJsonNull()) { + aafUsername = jsonObject.get("aaf_username").getAsString(); + } + if (jsonObject.has("aaf_password") && !jsonObject.get("aaf_password").isJsonNull()) { + aafPassword = jsonObject.get("aaf_password").getAsString(); + } + if (jsonObject.has("fetch_timeout") && !jsonObject.get("fetch_timeout").isJsonNull()) { + fetchTimeout = jsonObject.get("fetch_timeout").getAsInt(); + } + if (jsonObject.has("fetch_limit") && !jsonObject.get("fetch_limit").isJsonNull()) { + fetchLimit = jsonObject.get("fetch_limit").getAsInt(); + } + if (jsonObject.has("servers") && !jsonObject.get("servers").isJsonNull()) { + JsonArray jsonArray = jsonObject.get("servers").getAsJsonArray(); + servers = new ArrayList<>(); + for (int i=0, e=jsonArray.size(); i