diff options
author | ChuanyuChen <chenchuanyu@huawei.com> | 2023-04-10 10:37:47 +0800 |
---|---|---|
committer | ChuanyuChen <chenchuanyu@huawei.com> | 2023-04-10 10:37:47 +0800 |
commit | 107be6ba14cd4f6822c2e2e058a9f0880bd9e3c9 (patch) | |
tree | 20f89cbe8bb73e4b96ad3599653c2f6af43fff09 | |
parent | cd69d6ff83ee0d06683031f04ed71c692b6bffaa (diff) |
Support Dmaap Message Util
Support Dmaap Message Util
Issue-ID: USECASEUI-794
Signed-off-by: ChuanyuChen <chenchuanyu@huawei.com>
Change-Id: If54a17f72370667444fc87129d8fdc0958be8692
7 files changed, 785 insertions, 0 deletions
diff --git a/intentanalysis/pom.xml b/intentanalysis/pom.xml index ac21f4d..84f93aa 100644 --- a/intentanalysis/pom.xml +++ b/intentanalysis/pom.xml @@ -212,6 +212,30 @@ <artifactId>commons-io</artifactId> <version>2.7</version> </dependency> + <dependency> + <groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId> + <artifactId>dmaapClient</artifactId> + <version>1.1.12</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <artifactId>apache-log4j-extras</artifactId> + <groupId>log4j</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> + <artifactId>dmaap-client</artifactId> + <version>1.8.7</version> + </dependency> </dependencies> <build> <plugins> diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java new file mode 100644 index 0000000..4712d0c --- /dev/null +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java @@ -0,0 +1,240 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2022 Huawei Canada Limited. + * Copyright (C) 2022 CTC, Inc. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.usecaseui.intentanalysis.adapters.dmaap; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.vavr.collection.List; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import lombok.NonNull; +import org.apache.commons.io.FileUtils; +import org.apache.ibatis.io.Resources; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import org.onap.usecaseui.intentanalysis.util.DmaapUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +/** + * This is a Dmaap message-router topic monitor. + * It takes advantage of AT&T's dmaap-client's long-polling implementation, this monitor constantly fetch/refetch target msg topic. + * So that new msg can be notified almost immediately. This is the major different from previous implementation. + */ +public class MRTopicMonitor implements Runnable { + + private final String name; + private volatile boolean running = false; + private static Logger logger = LoggerFactory.getLogger(MRTopicMonitor.class); + private static int DEFAULT_TIMEOUT_MS_FETCH = 15000; + private MRConsumerWrapper consumerWrapper; + private NotificationCallback callback; + + /** + * Constructor + * @param name name of topic subscriber in config + * @param callback callbackfunction for received message + */ + public MRTopicMonitor(String name, NotificationCallback callback){ + this.name = name; + this.callback = callback; + } + + /** + * Start the monitoring thread + */ + public void start(){ + logger.info("Starting Dmaap Bus Monitor"); + try { + File configFile = Resources.getResourceAsFile("intentPolicy/modifycll.json"); + String configBody = FileUtils.readFileToString(configFile, StandardCharsets.UTF_8); + JsonObject jsonObject = JsonParser.parseString(configBody).getAsJsonObject(); + consumerWrapper = buildConsumerWrapper(jsonObject); + running = true; + Executor executor = Executors.newSingleThreadExecutor(); + executor.execute(this); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + /** + * Main loop that keep fetching and processing + */ + @Override + public void run(){ + while (running){ + try { + logger.debug("Topic: {} getting new msg...", name); + List<JsonElement> dmaapMsgs = consumerWrapper.fetch(); + for (JsonElement msg : dmaapMsgs){ + logger.debug("Received message: {}" + + "\r\n and processing start", msg); + process(msg.toString()); + } + } catch (IOException | RuntimeException e){ + logger.error("fetchMessage encountered error: {}", e); + } + } + logger.info("{}: exiting thread", this); + } + + /** + * Stop the monitor + */ + public void stop(){ + logger.info("{}: exiting", this); + running = false; + this.consumerWrapper.close(); + this.consumerWrapper = null; + } + + private void process(String msg) { + try { + callback.activateCallBack(msg); + } catch (Exception e){ + logger.error("process message encountered error: {}", e); + } + } + + private List<JsonElement> fetch() throws IOException { + return this.consumerWrapper.fetch(); + } + + private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson ) + throws IllegalArgumentException { + MRTopicParams topicParams = MRTopicParams.builder().buildFromConfigJson(topicParamsJson).build(); + return new MRConsumerWrapper(topicParams); + } + + /** + * Wrapper class of DmaapClient (package org.onap.dmaap.mr.client) + * A polling fashion dmaap consumer, whose #fetch() sleep a certain time when connection fails, otherwise keep retryiny. + * It supports both https and http protocols. + */ + private class MRConsumerWrapper { + /** + * Name of the "protocol" property. + */ + protected static final String PROTOCOL_PROP = "Protocol"; + /** + * Fetch timeout. + */ + protected int fetchTimeout; + + /** + * Time to sleep on a fetch failure. + */ + @Getter + private final int sleepTime; + + /** + * Counted down when {@link #close()} is invoked. + */ + private final CountDownLatch closeCondition = new CountDownLatch(1); + + protected MessageRouterSubscriber subscriber; + protected MessageRouterSubscribeRequest request; + + /** + * Constructs the object. + * + * @param MRTopicParams parameters for the bus topic + */ + protected MRConsumerWrapper(MRTopicParams MRTopicParams) { + this.fetchTimeout = MRTopicParams.getFetchTimeout(); + + if (this.fetchTimeout <= 0) { + this.sleepTime = DEFAULT_TIMEOUT_MS_FETCH; + } else { + // don't sleep too long, even if fetch timeout is large + this.sleepTime = Math.min(this.fetchTimeout, DEFAULT_TIMEOUT_MS_FETCH); + } + + if (MRTopicParams.isTopicInvalid()) { + throw new IllegalArgumentException("No topic for DMaaP"); + } + + if (MRTopicParams.isServersInvalid()) { + throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); + } + + try{ + this.subscriber = DmaapUtil.buildSubscriber(); + this.request = DmaapUtil.buildSubscriberRequest("aai_subscriber", MRTopicParams.getTopic()); + + } catch (Exception e) { + throw new IllegalArgumentException("Illegal MrConsumer parameters"); + } + + } + + /** + * Try fetch new message. But backoff for some sleepTime when connection fails. + * @return + * @throws IOException + */ + public List<JsonElement> fetch() throws IOException { + Mono<MessageRouterSubscribeResponse> responses = this.subscriber.get(this.request); + MessageRouterSubscribeResponse resp = responses.block(); + List<JsonElement> list = resp.items(); + return list; + + } + + /** + * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed, + * or the thread is interrupted, then this will return immediately. + */ + protected void sleepAfterFetchFailure() { + try { + logger.info("{}: backoff for {}ms", this, sleepTime); + if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) { + logger.info("{}: closed while handling fetch error", this); + } + + } catch (InterruptedException e) { + logger.warn("{}: interrupted while handling fetch error", this, e); + Thread.currentThread().interrupt(); + } + } + + /** + * Close the dmaap client and this thread + */ + public void close() { + this.closeCondition.countDown(); + } + } +} diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java new file mode 100644 index 0000000..7e2b3b6 --- /dev/null +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java @@ -0,0 +1,375 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved. + * Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2022 Huawei Canada Limited. + * Copyright (C) 2022 CTC, Inc. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.usecaseui.intentanalysis.adapters.dmaap; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.apache.commons.lang3.StringUtils; + +/** + * Partially copied from Onap Policy + * policy/common/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java + * Modified to fit this project. + * Member variables of this Params class are as follows. + * + * <p>servers DMaaP servers + * topic DMaaP Topic to be monitored + * apiKey DMaaP API Key (optional) + * apiSecret DMaaP API Secret (optional) + * consumerGroup DMaaP Reader Consumer Group + * consumerInstance DMaaP Reader Instance + * fetchTimeout DMaaP fetch timeout + * fetchLimit DMaaP fetch limit + * environment DME2 Environment + * aftEnvironment DME2 AFT Environment + * partner DME2 Partner + * latitude DME2 Latitude + * longitude DME2 Longitude + * additionalProps Additional properties to pass to DME2 + * useHttps does connection use HTTPS? + * allowSelfSignedCerts are self-signed certificates allow + */ +@Getter +@Setter +public class MRTopicParams { + + private int port; + private List<String> servers; + private Map<String, String> additionalProps; + private String topic; + private String effectiveTopic; + private String apiKey; + private String apiSecret; + private String consumerGroup; + private String consumerInstance; + private int fetchTimeout; + private int fetchLimit; + private boolean useHttps; + private boolean allowSelfSignedCerts; + private boolean managed; + + private String userName; + private String password; + private String environment; + private String aftEnvironment; + private String partner; + private String latitude; + private String longitude; + private String partitionId; + private String clientName; + private String hostname; + private String basePath; + @Getter + private String serializationProvider; + + public static TopicParamsBuilder builder() { + return new TopicParamsBuilder(); + } + + /** + * Methods to Check if the property is INVALID. + */ + + public boolean isEnvironmentInvalid() { + return StringUtils.isBlank(environment); + } + + public boolean isAftEnvironmentInvalid() { + return StringUtils.isBlank(aftEnvironment); + } + + public boolean isLatitudeInvalid() { + return StringUtils.isBlank(latitude); + } + + public boolean isLongitudeInvalid() { + return StringUtils.isBlank(longitude); + } + + public boolean isConsumerInstanceInvalid() { + return StringUtils.isBlank(consumerInstance); + } + + public boolean isConsumerGroupInvalid() { + return StringUtils.isBlank(consumerGroup); + } + + public boolean isClientNameInvalid() { + return StringUtils.isBlank(clientName); + } + + public boolean isPartnerInvalid() { + return StringUtils.isBlank(partner); + } + + public boolean isServersInvalid() { + return (servers == null || servers.isEmpty() + || (servers.size() == 1 && ("".equals(servers.get(0))))); + } + + public boolean isTopicInvalid() { + return StringUtils.isBlank(topic); + } + + public boolean isPartitionIdInvalid() { + return StringUtils.isBlank(partitionId); + } + + public boolean isHostnameInvalid() { + return StringUtils.isBlank(hostname); + } + + public boolean isPortInvalid() { + return (port <= 0 || port >= 65535); + } + + /** + * Methods to Check if the property is Valid. + */ + + public boolean isApiKeyValid() { + return StringUtils.isNotBlank(apiKey); + } + + public boolean isApiSecretValid() { + return StringUtils.isNotBlank(apiSecret); + } + + public boolean isUserNameValid() { + return StringUtils.isNotBlank(userName); + } + + public boolean isPasswordValid() { + return StringUtils.isNotBlank(password); + } + + public boolean isAdditionalPropsValid() { + return additionalProps != null; + } + + @NoArgsConstructor(access = AccessLevel.PRIVATE) + public static class TopicParamsBuilder { + + final MRTopicParams params = new MRTopicParams(); + + public TopicParamsBuilder servers(List<String> servers) { + this.params.servers = servers; + return this; + } + + public TopicParamsBuilder topic(String topic) { + this.params.topic = topic; + return this; + } + + public TopicParamsBuilder effectiveTopic(String effectiveTopic) { + this.params.effectiveTopic = effectiveTopic; + return this; + } + + public TopicParamsBuilder apiKey(String apiKey) { + this.params.apiKey = apiKey; + return this; + } + + public TopicParamsBuilder apiSecret(String apiSecret) { + this.params.apiSecret = apiSecret; + return this; + } + + public TopicParamsBuilder consumerGroup(String consumerGroup) { + this.params.consumerGroup = consumerGroup; + return this; + } + + public TopicParamsBuilder consumerInstance(String consumerInstance) { + this.params.consumerInstance = consumerInstance; + return this; + } + + public TopicParamsBuilder fetchTimeout(int fetchTimeout) { + this.params.fetchTimeout = fetchTimeout; + return this; + } + + public TopicParamsBuilder fetchLimit(int fetchLimit) { + this.params.fetchLimit = fetchLimit; + return this; + } + + public TopicParamsBuilder useHttps(boolean useHttps) { + this.params.useHttps = useHttps; + return this; + } + + public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) { + this.params.allowSelfSignedCerts = allowSelfSignedCerts; + return this; + } + + public TopicParamsBuilder userName(String userName) { + this.params.userName = userName; + return this; + } + + public TopicParamsBuilder password(String password) { + this.params.password = password; + return this; + } + + public TopicParamsBuilder environment(String environment) { + this.params.environment = environment; + return this; + } + + public TopicParamsBuilder aftEnvironment(String aftEnvironment) { + this.params.aftEnvironment = aftEnvironment; + return this; + } + + public TopicParamsBuilder partner(String partner) { + this.params.partner = partner; + return this; + } + + public TopicParamsBuilder latitude(String latitude) { + this.params.latitude = latitude; + return this; + } + + public TopicParamsBuilder longitude(String longitude) { + this.params.longitude = longitude; + return this; + } + + public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) { + this.params.additionalProps = additionalProps; + return this; + } + + public TopicParamsBuilder partitionId(String partitionId) { + this.params.partitionId = partitionId; + return this; + } + + public MRTopicParams build() { + return params; + } + + public TopicParamsBuilder buildFromConfigJson(JsonObject jsonObject) { + String consumerGroup = null; + String consumerInstance = null; + String aafUsername = null; + String aafPassword = null; + List<String> servers = new ArrayList<>(); + String topic = null; + boolean useHttps = false; + int fetchTimeout = -1; + int fetchLimit = -1; + + if (jsonObject.has("consumer_group") && !jsonObject.get("consumer_group").isJsonNull()) { + consumerGroup = jsonObject.get("consumer_group").getAsString(); + } + if (jsonObject.has("consumer_instance") && !jsonObject.get("consumer_instance").isJsonNull()) { + consumerInstance = jsonObject.get("consumer_instance").getAsString(); + } + if (jsonObject.has("aaf_username") && !jsonObject.get("aaf_username").isJsonNull()) { + aafUsername = jsonObject.get("aaf_username").getAsString(); + } + if (jsonObject.has("aaf_password") && !jsonObject.get("aaf_password").isJsonNull()) { + aafPassword = jsonObject.get("aaf_password").getAsString(); + } + if (jsonObject.has("fetch_timeout") && !jsonObject.get("fetch_timeout").isJsonNull()) { + fetchTimeout = jsonObject.get("fetch_timeout").getAsInt(); + } + if (jsonObject.has("fetch_limit") && !jsonObject.get("fetch_limit").isJsonNull()) { + fetchLimit = jsonObject.get("fetch_limit").getAsInt(); + } + if (jsonObject.has("servers") && !jsonObject.get("servers").isJsonNull()) { + JsonArray jsonArray = jsonObject.get("servers").getAsJsonArray(); + servers = new ArrayList<>(); + for (int i=0, e=jsonArray.size(); i<e; i++){ + servers.add(jsonArray.get(i).getAsString()); + } + } + + String topicUrl = jsonObject.get("dmaap_info").getAsJsonObject().get("topic_url").getAsString(); + if (topicUrl.startsWith("https")){ + useHttps = true; + } + String[] pmTopicSplit = topicUrl.split("\\/"); + topic = pmTopicSplit[pmTopicSplit.length - 1]; + + this.params.topic = topicUrl; + this.params.servers = servers; + this.params.consumerGroup = consumerGroup; + this.params.consumerInstance = consumerInstance; + this.params.password = aafPassword; + this.params.userName = aafUsername; + this.params.fetchTimeout = fetchTimeout; + this.params.fetchLimit = fetchLimit; + this.params.useHttps = useHttps; + return this; + } + + public TopicParamsBuilder managed(boolean managed) { + this.params.managed = managed; + return this; + } + + public TopicParamsBuilder hostname(String hostname) { + this.params.hostname = hostname; + return this; + } + + public TopicParamsBuilder clientName(String clientName) { + this.params.clientName = clientName; + return this; + } + + public TopicParamsBuilder port(int port) { + this.params.port = port; + return this; + } + + public TopicParamsBuilder basePath(String basePath) { + this.params.basePath = basePath; + return this; + } + + public TopicParamsBuilder serializationProvider(String serializationProvider) { + this.params.serializationProvider = serializationProvider; + return this; + } + + } +} diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationCallback.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationCallback.java new file mode 100644 index 0000000..dc46485 --- /dev/null +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationCallback.java @@ -0,0 +1,26 @@ +/* + * Copyright 2023 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.usecaseui.intentanalysis.adapters.dmaap; + +/** + * Interface for DmaapNotificationCallback + * + */ +public interface NotificationCallback { + + public abstract void activateCallBack(String msg); + +} diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java new file mode 100644 index 0000000..c6cbfce --- /dev/null +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java @@ -0,0 +1,80 @@ +/* + * Copyright 2023 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.usecaseui.intentanalysis.util; + +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; + +public class DmaapUtil { + public static MessageRouterSubscriber buildSubscriber(){ + MessageRouterSubscriberConfig connectionPoolConfiguration = ImmutableMessageRouterSubscriberConfig.builder() + .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder() + .connectionPool(16) + .maxIdleTime(10) //in seconds + .maxLifeTime(20) //in seconds + .build()) + .build(); + + MessageRouterSubscriber cut = DmaapClientFactory.createMessageRouterSubscriber(connectionPoolConfiguration); + return cut; + } + + public static MessageRouterSubscribeRequest buildSubscriberRequest(String name, String topicUrl){ + MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() + .name(name) + .topicUrl(topicUrl) + .build(); + MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder() + .consumerGroup("1") + .consumerId("1") + .sourceDefinition(sourceDefinition) + .build(); + + return request; + } + + public static MessageRouterPublisher buildPublisher(){ + MessageRouterPublisher pub = DmaapClientFactory + .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault()); + return pub; + } + + public static MessageRouterPublishRequest buildPublisherRequest(String name, String topicUrl){ + MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() + .name(name) + .topicUrl(topicUrl) + .build(); + MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition) + .contentType(ContentType.TEXT_PLAIN) + .build(); + return request; + } +} diff --git a/intentanalysis/src/main/resources/dmaapConfig/dcae_dmaap_config.json b/intentanalysis/src/main/resources/dmaapConfig/dcae_dmaap_config.json new file mode 100644 index 0000000..d22e843 --- /dev/null +++ b/intentanalysis/src/main/resources/dmaapConfig/dcae_dmaap_config.json @@ -0,0 +1,20 @@ +{ + "dcae_subscriber":{ + "type":"message_router", + "aaf_username": null, + "aaf_password": null, + "api_key" : null, + "api_secret" : null, + "servers" : ["message-router:3904"], + "consumer_group" : "intent_analysis_dcaeevent", + "consumer_instance" : "intent_analysis_dcaeevent_1", + "fetch_timeout" : 15000, + "fetch_limit" : 100, + "dmaap_info":{ + "topic_url":"http://message-router:3904/events/INTENT-EVENT", + "client_role":"org.onap.uui.intentanalysisSub", + "location":"onap", + "client_id":"intent-analysis-1" + } + } +} diff --git a/intentanalysis/src/main/resources/dmaapConfig/policy_dmaap_config.json b/intentanalysis/src/main/resources/dmaapConfig/policy_dmaap_config.json new file mode 100644 index 0000000..0ec440f --- /dev/null +++ b/intentanalysis/src/main/resources/dmaapConfig/policy_dmaap_config.json @@ -0,0 +1,20 @@ +{ + "policy_subscriber":{ + "type":"message_router", + "aaf_username": null, + "aaf_password": null, + "api_key" : null, + "api_secret" : null, + "servers" : ["message-router:3904"], + "consumer_group" : "intent_analysis_policyevent", + "consumer_instance" : "intent_analysis_policyevent_1", + "fetch_timeout" : 15000, + "fetch_limit" : 100, + "dmaap_info":{ + "topic_url":"http://message-router:3904/events/INTENT-EVENT", + "client_role":"org.onap.uui.intentanalysisSub", + "location":"onap", + "client_id":"intent-analysis-1" + } + } +} |