aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChuanyuChen <chenchuanyu@huawei.com>2023-04-10 10:37:47 +0800
committerChuanyuChen <chenchuanyu@huawei.com>2023-04-10 10:37:47 +0800
commit107be6ba14cd4f6822c2e2e058a9f0880bd9e3c9 (patch)
tree20f89cbe8bb73e4b96ad3599653c2f6af43fff09
parentcd69d6ff83ee0d06683031f04ed71c692b6bffaa (diff)
Support Dmaap Message Util
Support Dmaap Message Util Issue-ID: USECASEUI-794 Signed-off-by: ChuanyuChen <chenchuanyu@huawei.com> Change-Id: If54a17f72370667444fc87129d8fdc0958be8692
-rw-r--r--intentanalysis/pom.xml24
-rw-r--r--intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java240
-rw-r--r--intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java375
-rw-r--r--intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationCallback.java26
-rw-r--r--intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java80
-rw-r--r--intentanalysis/src/main/resources/dmaapConfig/dcae_dmaap_config.json20
-rw-r--r--intentanalysis/src/main/resources/dmaapConfig/policy_dmaap_config.json20
7 files changed, 785 insertions, 0 deletions
diff --git a/intentanalysis/pom.xml b/intentanalysis/pom.xml
index ac21f4d..84f93aa 100644
--- a/intentanalysis/pom.xml
+++ b/intentanalysis/pom.xml
@@ -212,6 +212,30 @@
<artifactId>commons-io</artifactId>
<version>2.7</version>
</dependency>
+ <dependency>
+ <groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId>
+ <artifactId>dmaapClient</artifactId>
+ <version>1.1.12</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>apache-log4j-extras</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>dmaap-client</artifactId>
+ <version>1.8.7</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java
new file mode 100644
index 0000000..4712d0c
--- /dev/null
+++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java
@@ -0,0 +1,240 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * Copyright (C) 2022 CTC, Inc.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.usecaseui.intentanalysis.adapters.dmaap;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.vavr.collection.List;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.NonNull;
+import org.apache.commons.io.FileUtils;
+import org.apache.ibatis.io.Resources;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.usecaseui.intentanalysis.util.DmaapUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
+/**
+ * This is a Dmaap message-router topic monitor.
+ * It takes advantage of AT&T's dmaap-client's long-polling implementation, this monitor constantly fetch/refetch target msg topic.
+ * So that new msg can be notified almost immediately. This is the major different from previous implementation.
+ */
+public class MRTopicMonitor implements Runnable {
+
+ private final String name;
+ private volatile boolean running = false;
+ private static Logger logger = LoggerFactory.getLogger(MRTopicMonitor.class);
+ private static int DEFAULT_TIMEOUT_MS_FETCH = 15000;
+ private MRConsumerWrapper consumerWrapper;
+ private NotificationCallback callback;
+
+ /**
+ * Constructor
+ * @param name name of topic subscriber in config
+ * @param callback callbackfunction for received message
+ */
+ public MRTopicMonitor(String name, NotificationCallback callback){
+ this.name = name;
+ this.callback = callback;
+ }
+
+ /**
+ * Start the monitoring thread
+ */
+ public void start(){
+ logger.info("Starting Dmaap Bus Monitor");
+ try {
+ File configFile = Resources.getResourceAsFile("intentPolicy/modifycll.json");
+ String configBody = FileUtils.readFileToString(configFile, StandardCharsets.UTF_8);
+ JsonObject jsonObject = JsonParser.parseString(configBody).getAsJsonObject();
+ consumerWrapper = buildConsumerWrapper(jsonObject);
+ running = true;
+ Executor executor = Executors.newSingleThreadExecutor();
+ executor.execute(this);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ /**
+ * Main loop that keep fetching and processing
+ */
+ @Override
+ public void run(){
+ while (running){
+ try {
+ logger.debug("Topic: {} getting new msg...", name);
+ List<JsonElement> dmaapMsgs = consumerWrapper.fetch();
+ for (JsonElement msg : dmaapMsgs){
+ logger.debug("Received message: {}" +
+ "\r\n and processing start", msg);
+ process(msg.toString());
+ }
+ } catch (IOException | RuntimeException e){
+ logger.error("fetchMessage encountered error: {}", e);
+ }
+ }
+ logger.info("{}: exiting thread", this);
+ }
+
+ /**
+ * Stop the monitor
+ */
+ public void stop(){
+ logger.info("{}: exiting", this);
+ running = false;
+ this.consumerWrapper.close();
+ this.consumerWrapper = null;
+ }
+
+ private void process(String msg) {
+ try {
+ callback.activateCallBack(msg);
+ } catch (Exception e){
+ logger.error("process message encountered error: {}", e);
+ }
+ }
+
+ private List<JsonElement> fetch() throws IOException {
+ return this.consumerWrapper.fetch();
+ }
+
+ private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson )
+ throws IllegalArgumentException {
+ MRTopicParams topicParams = MRTopicParams.builder().buildFromConfigJson(topicParamsJson).build();
+ return new MRConsumerWrapper(topicParams);
+ }
+
+ /**
+ * Wrapper class of DmaapClient (package org.onap.dmaap.mr.client)
+ * A polling fashion dmaap consumer, whose #fetch() sleep a certain time when connection fails, otherwise keep retryiny.
+ * It supports both https and http protocols.
+ */
+ private class MRConsumerWrapper {
+ /**
+ * Name of the "protocol" property.
+ */
+ protected static final String PROTOCOL_PROP = "Protocol";
+ /**
+ * Fetch timeout.
+ */
+ protected int fetchTimeout;
+
+ /**
+ * Time to sleep on a fetch failure.
+ */
+ @Getter
+ private final int sleepTime;
+
+ /**
+ * Counted down when {@link #close()} is invoked.
+ */
+ private final CountDownLatch closeCondition = new CountDownLatch(1);
+
+ protected MessageRouterSubscriber subscriber;
+ protected MessageRouterSubscribeRequest request;
+
+ /**
+ * Constructs the object.
+ *
+ * @param MRTopicParams parameters for the bus topic
+ */
+ protected MRConsumerWrapper(MRTopicParams MRTopicParams) {
+ this.fetchTimeout = MRTopicParams.getFetchTimeout();
+
+ if (this.fetchTimeout <= 0) {
+ this.sleepTime = DEFAULT_TIMEOUT_MS_FETCH;
+ } else {
+ // don't sleep too long, even if fetch timeout is large
+ this.sleepTime = Math.min(this.fetchTimeout, DEFAULT_TIMEOUT_MS_FETCH);
+ }
+
+ if (MRTopicParams.isTopicInvalid()) {
+ throw new IllegalArgumentException("No topic for DMaaP");
+ }
+
+ if (MRTopicParams.isServersInvalid()) {
+ throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
+ }
+
+ try{
+ this.subscriber = DmaapUtil.buildSubscriber();
+ this.request = DmaapUtil.buildSubscriberRequest("aai_subscriber", MRTopicParams.getTopic());
+
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Illegal MrConsumer parameters");
+ }
+
+ }
+
+ /**
+ * Try fetch new message. But backoff for some sleepTime when connection fails.
+ * @return
+ * @throws IOException
+ */
+ public List<JsonElement> fetch() throws IOException {
+ Mono<MessageRouterSubscribeResponse> responses = this.subscriber.get(this.request);
+ MessageRouterSubscribeResponse resp = responses.block();
+ List<JsonElement> list = resp.items();
+ return list;
+
+ }
+
+ /**
+ * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
+ * or the thread is interrupted, then this will return immediately.
+ */
+ protected void sleepAfterFetchFailure() {
+ try {
+ logger.info("{}: backoff for {}ms", this, sleepTime);
+ if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
+ logger.info("{}: closed while handling fetch error", this);
+ }
+
+ } catch (InterruptedException e) {
+ logger.warn("{}: interrupted while handling fetch error", this, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Close the dmaap client and this thread
+ */
+ public void close() {
+ this.closeCondition.countDown();
+ }
+ }
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java
new file mode 100644
index 0000000..7e2b3b6
--- /dev/null
+++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java
@@ -0,0 +1,375 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved.
+ * Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * Copyright (C) 2022 CTC, Inc.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.usecaseui.intentanalysis.adapters.dmaap;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Partially copied from Onap Policy
+ * policy/common/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
+ * Modified to fit this project.
+ * Member variables of this Params class are as follows.
+ *
+ * <p>servers DMaaP servers
+ * topic DMaaP Topic to be monitored
+ * apiKey DMaaP API Key (optional)
+ * apiSecret DMaaP API Secret (optional)
+ * consumerGroup DMaaP Reader Consumer Group
+ * consumerInstance DMaaP Reader Instance
+ * fetchTimeout DMaaP fetch timeout
+ * fetchLimit DMaaP fetch limit
+ * environment DME2 Environment
+ * aftEnvironment DME2 AFT Environment
+ * partner DME2 Partner
+ * latitude DME2 Latitude
+ * longitude DME2 Longitude
+ * additionalProps Additional properties to pass to DME2
+ * useHttps does connection use HTTPS?
+ * allowSelfSignedCerts are self-signed certificates allow
+ */
+@Getter
+@Setter
+public class MRTopicParams {
+
+ private int port;
+ private List<String> servers;
+ private Map<String, String> additionalProps;
+ private String topic;
+ private String effectiveTopic;
+ private String apiKey;
+ private String apiSecret;
+ private String consumerGroup;
+ private String consumerInstance;
+ private int fetchTimeout;
+ private int fetchLimit;
+ private boolean useHttps;
+ private boolean allowSelfSignedCerts;
+ private boolean managed;
+
+ private String userName;
+ private String password;
+ private String environment;
+ private String aftEnvironment;
+ private String partner;
+ private String latitude;
+ private String longitude;
+ private String partitionId;
+ private String clientName;
+ private String hostname;
+ private String basePath;
+ @Getter
+ private String serializationProvider;
+
+ public static TopicParamsBuilder builder() {
+ return new TopicParamsBuilder();
+ }
+
+ /**
+ * Methods to Check if the property is INVALID.
+ */
+
+ public boolean isEnvironmentInvalid() {
+ return StringUtils.isBlank(environment);
+ }
+
+ public boolean isAftEnvironmentInvalid() {
+ return StringUtils.isBlank(aftEnvironment);
+ }
+
+ public boolean isLatitudeInvalid() {
+ return StringUtils.isBlank(latitude);
+ }
+
+ public boolean isLongitudeInvalid() {
+ return StringUtils.isBlank(longitude);
+ }
+
+ public boolean isConsumerInstanceInvalid() {
+ return StringUtils.isBlank(consumerInstance);
+ }
+
+ public boolean isConsumerGroupInvalid() {
+ return StringUtils.isBlank(consumerGroup);
+ }
+
+ public boolean isClientNameInvalid() {
+ return StringUtils.isBlank(clientName);
+ }
+
+ public boolean isPartnerInvalid() {
+ return StringUtils.isBlank(partner);
+ }
+
+ public boolean isServersInvalid() {
+ return (servers == null || servers.isEmpty()
+ || (servers.size() == 1 && ("".equals(servers.get(0)))));
+ }
+
+ public boolean isTopicInvalid() {
+ return StringUtils.isBlank(topic);
+ }
+
+ public boolean isPartitionIdInvalid() {
+ return StringUtils.isBlank(partitionId);
+ }
+
+ public boolean isHostnameInvalid() {
+ return StringUtils.isBlank(hostname);
+ }
+
+ public boolean isPortInvalid() {
+ return (port <= 0 || port >= 65535);
+ }
+
+ /**
+ * Methods to Check if the property is Valid.
+ */
+
+ public boolean isApiKeyValid() {
+ return StringUtils.isNotBlank(apiKey);
+ }
+
+ public boolean isApiSecretValid() {
+ return StringUtils.isNotBlank(apiSecret);
+ }
+
+ public boolean isUserNameValid() {
+ return StringUtils.isNotBlank(userName);
+ }
+
+ public boolean isPasswordValid() {
+ return StringUtils.isNotBlank(password);
+ }
+
+ public boolean isAdditionalPropsValid() {
+ return additionalProps != null;
+ }
+
+ @NoArgsConstructor(access = AccessLevel.PRIVATE)
+ public static class TopicParamsBuilder {
+
+ final MRTopicParams params = new MRTopicParams();
+
+ public TopicParamsBuilder servers(List<String> servers) {
+ this.params.servers = servers;
+ return this;
+ }
+
+ public TopicParamsBuilder topic(String topic) {
+ this.params.topic = topic;
+ return this;
+ }
+
+ public TopicParamsBuilder effectiveTopic(String effectiveTopic) {
+ this.params.effectiveTopic = effectiveTopic;
+ return this;
+ }
+
+ public TopicParamsBuilder apiKey(String apiKey) {
+ this.params.apiKey = apiKey;
+ return this;
+ }
+
+ public TopicParamsBuilder apiSecret(String apiSecret) {
+ this.params.apiSecret = apiSecret;
+ return this;
+ }
+
+ public TopicParamsBuilder consumerGroup(String consumerGroup) {
+ this.params.consumerGroup = consumerGroup;
+ return this;
+ }
+
+ public TopicParamsBuilder consumerInstance(String consumerInstance) {
+ this.params.consumerInstance = consumerInstance;
+ return this;
+ }
+
+ public TopicParamsBuilder fetchTimeout(int fetchTimeout) {
+ this.params.fetchTimeout = fetchTimeout;
+ return this;
+ }
+
+ public TopicParamsBuilder fetchLimit(int fetchLimit) {
+ this.params.fetchLimit = fetchLimit;
+ return this;
+ }
+
+ public TopicParamsBuilder useHttps(boolean useHttps) {
+ this.params.useHttps = useHttps;
+ return this;
+ }
+
+ public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) {
+ this.params.allowSelfSignedCerts = allowSelfSignedCerts;
+ return this;
+ }
+
+ public TopicParamsBuilder userName(String userName) {
+ this.params.userName = userName;
+ return this;
+ }
+
+ public TopicParamsBuilder password(String password) {
+ this.params.password = password;
+ return this;
+ }
+
+ public TopicParamsBuilder environment(String environment) {
+ this.params.environment = environment;
+ return this;
+ }
+
+ public TopicParamsBuilder aftEnvironment(String aftEnvironment) {
+ this.params.aftEnvironment = aftEnvironment;
+ return this;
+ }
+
+ public TopicParamsBuilder partner(String partner) {
+ this.params.partner = partner;
+ return this;
+ }
+
+ public TopicParamsBuilder latitude(String latitude) {
+ this.params.latitude = latitude;
+ return this;
+ }
+
+ public TopicParamsBuilder longitude(String longitude) {
+ this.params.longitude = longitude;
+ return this;
+ }
+
+ public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) {
+ this.params.additionalProps = additionalProps;
+ return this;
+ }
+
+ public TopicParamsBuilder partitionId(String partitionId) {
+ this.params.partitionId = partitionId;
+ return this;
+ }
+
+ public MRTopicParams build() {
+ return params;
+ }
+
+ public TopicParamsBuilder buildFromConfigJson(JsonObject jsonObject) {
+ String consumerGroup = null;
+ String consumerInstance = null;
+ String aafUsername = null;
+ String aafPassword = null;
+ List<String> servers = new ArrayList<>();
+ String topic = null;
+ boolean useHttps = false;
+ int fetchTimeout = -1;
+ int fetchLimit = -1;
+
+ if (jsonObject.has("consumer_group") && !jsonObject.get("consumer_group").isJsonNull()) {
+ consumerGroup = jsonObject.get("consumer_group").getAsString();
+ }
+ if (jsonObject.has("consumer_instance") && !jsonObject.get("consumer_instance").isJsonNull()) {
+ consumerInstance = jsonObject.get("consumer_instance").getAsString();
+ }
+ if (jsonObject.has("aaf_username") && !jsonObject.get("aaf_username").isJsonNull()) {
+ aafUsername = jsonObject.get("aaf_username").getAsString();
+ }
+ if (jsonObject.has("aaf_password") && !jsonObject.get("aaf_password").isJsonNull()) {
+ aafPassword = jsonObject.get("aaf_password").getAsString();
+ }
+ if (jsonObject.has("fetch_timeout") && !jsonObject.get("fetch_timeout").isJsonNull()) {
+ fetchTimeout = jsonObject.get("fetch_timeout").getAsInt();
+ }
+ if (jsonObject.has("fetch_limit") && !jsonObject.get("fetch_limit").isJsonNull()) {
+ fetchLimit = jsonObject.get("fetch_limit").getAsInt();
+ }
+ if (jsonObject.has("servers") && !jsonObject.get("servers").isJsonNull()) {
+ JsonArray jsonArray = jsonObject.get("servers").getAsJsonArray();
+ servers = new ArrayList<>();
+ for (int i=0, e=jsonArray.size(); i<e; i++){
+ servers.add(jsonArray.get(i).getAsString());
+ }
+ }
+
+ String topicUrl = jsonObject.get("dmaap_info").getAsJsonObject().get("topic_url").getAsString();
+ if (topicUrl.startsWith("https")){
+ useHttps = true;
+ }
+ String[] pmTopicSplit = topicUrl.split("\\/");
+ topic = pmTopicSplit[pmTopicSplit.length - 1];
+
+ this.params.topic = topicUrl;
+ this.params.servers = servers;
+ this.params.consumerGroup = consumerGroup;
+ this.params.consumerInstance = consumerInstance;
+ this.params.password = aafPassword;
+ this.params.userName = aafUsername;
+ this.params.fetchTimeout = fetchTimeout;
+ this.params.fetchLimit = fetchLimit;
+ this.params.useHttps = useHttps;
+ return this;
+ }
+
+ public TopicParamsBuilder managed(boolean managed) {
+ this.params.managed = managed;
+ return this;
+ }
+
+ public TopicParamsBuilder hostname(String hostname) {
+ this.params.hostname = hostname;
+ return this;
+ }
+
+ public TopicParamsBuilder clientName(String clientName) {
+ this.params.clientName = clientName;
+ return this;
+ }
+
+ public TopicParamsBuilder port(int port) {
+ this.params.port = port;
+ return this;
+ }
+
+ public TopicParamsBuilder basePath(String basePath) {
+ this.params.basePath = basePath;
+ return this;
+ }
+
+ public TopicParamsBuilder serializationProvider(String serializationProvider) {
+ this.params.serializationProvider = serializationProvider;
+ return this;
+ }
+
+ }
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationCallback.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationCallback.java
new file mode 100644
index 0000000..dc46485
--- /dev/null
+++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationCallback.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dmaap;
+
+/**
+ * Interface for DmaapNotificationCallback
+ *
+ */
+public interface NotificationCallback {
+
+ public abstract void activateCallBack(String msg);
+
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java
new file mode 100644
index 0000000..c6cbfce
--- /dev/null
+++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.util;
+
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+
+public class DmaapUtil {
+ public static MessageRouterSubscriber buildSubscriber(){
+ MessageRouterSubscriberConfig connectionPoolConfiguration = ImmutableMessageRouterSubscriberConfig.builder()
+ .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
+ .connectionPool(16)
+ .maxIdleTime(10) //in seconds
+ .maxLifeTime(20) //in seconds
+ .build())
+ .build();
+
+ MessageRouterSubscriber cut = DmaapClientFactory.createMessageRouterSubscriber(connectionPoolConfiguration);
+ return cut;
+ }
+
+ public static MessageRouterSubscribeRequest buildSubscriberRequest(String name, String topicUrl){
+ MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
+ .name(name)
+ .topicUrl(topicUrl)
+ .build();
+ MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder()
+ .consumerGroup("1")
+ .consumerId("1")
+ .sourceDefinition(sourceDefinition)
+ .build();
+
+ return request;
+ }
+
+ public static MessageRouterPublisher buildPublisher(){
+ MessageRouterPublisher pub = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ return pub;
+ }
+
+ public static MessageRouterPublishRequest buildPublisherRequest(String name, String topicUrl){
+ MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+ .name(name)
+ .topicUrl(topicUrl)
+ .build();
+ MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition)
+ .contentType(ContentType.TEXT_PLAIN)
+ .build();
+ return request;
+ }
+}
diff --git a/intentanalysis/src/main/resources/dmaapConfig/dcae_dmaap_config.json b/intentanalysis/src/main/resources/dmaapConfig/dcae_dmaap_config.json
new file mode 100644
index 0000000..d22e843
--- /dev/null
+++ b/intentanalysis/src/main/resources/dmaapConfig/dcae_dmaap_config.json
@@ -0,0 +1,20 @@
+{
+ "dcae_subscriber":{
+ "type":"message_router",
+ "aaf_username": null,
+ "aaf_password": null,
+ "api_key" : null,
+ "api_secret" : null,
+ "servers" : ["message-router:3904"],
+ "consumer_group" : "intent_analysis_dcaeevent",
+ "consumer_instance" : "intent_analysis_dcaeevent_1",
+ "fetch_timeout" : 15000,
+ "fetch_limit" : 100,
+ "dmaap_info":{
+ "topic_url":"http://message-router:3904/events/INTENT-EVENT",
+ "client_role":"org.onap.uui.intentanalysisSub",
+ "location":"onap",
+ "client_id":"intent-analysis-1"
+ }
+ }
+}
diff --git a/intentanalysis/src/main/resources/dmaapConfig/policy_dmaap_config.json b/intentanalysis/src/main/resources/dmaapConfig/policy_dmaap_config.json
new file mode 100644
index 0000000..0ec440f
--- /dev/null
+++ b/intentanalysis/src/main/resources/dmaapConfig/policy_dmaap_config.json
@@ -0,0 +1,20 @@
+{
+ "policy_subscriber":{
+ "type":"message_router",
+ "aaf_username": null,
+ "aaf_password": null,
+ "api_key" : null,
+ "api_secret" : null,
+ "servers" : ["message-router:3904"],
+ "consumer_group" : "intent_analysis_policyevent",
+ "consumer_instance" : "intent_analysis_policyevent_1",
+ "fetch_timeout" : 15000,
+ "fetch_limit" : 100,
+ "dmaap_info":{
+ "topic_url":"http://message-router:3904/events/INTENT-EVENT",
+ "client_role":"org.onap.uui.intentanalysisSub",
+ "location":"onap",
+ "client_id":"intent-analysis-1"
+ }
+ }
+}