diff options
author | Keguang He <hekeguang@chinamobile.com> | 2023-04-11 05:51:45 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2023-04-11 05:51:45 +0000 |
commit | a893668c5c913ccb6cc3189fdce3331f866cd700 (patch) | |
tree | f57edf494f45895269199f482c263e657012fb68 /intentanalysis/src/main/java | |
parent | 1d7b2ee4fa358da12c9a515c7d15a765d9fb7004 (diff) | |
parent | ef6560bbb9bfa6248bf2afead5a9d02338aaf02f (diff) |
Merge "Add DCAE and Policy Dmaap Listener Service"
Diffstat (limited to 'intentanalysis/src/main/java')
9 files changed, 233 insertions, 27 deletions
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationCallback.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationCallback.java new file mode 100644 index 0000000..158cf91 --- /dev/null +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationCallback.java @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.usecaseui.intentanalysis.adapters.dcae.dmaap; + +import com.google.gson.Gson; +import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationCallback; +import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationEventModel; + +public class DCAENotificationCallback implements NotificationCallback { + + @Override + public void activateCallBack(String msg) { + NotificationEventModel event = (new Gson()).fromJson(msg, NotificationEventModel.class); + + //Todo analyze the event and Report to the Intent Flow; + } +} diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationService.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationService.java new file mode 100644 index 0000000..9ae8483 --- /dev/null +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationService.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.usecaseui.intentanalysis.adapters.dcae.dmaap; + +import org.onap.usecaseui.intentanalysis.adapters.dmaap.MRTopicMonitor; +import org.onap.usecaseui.intentanalysis.adapters.policy.dmaap.PolicyNotificationCallback; +import org.springframework.stereotype.Service; + +@Service +public class DCAENotificationService { + + //config of policy dmaap event subscribe + private static final String MONITOR_CONFIG_FILE = "dcae_dmaap_config.json"; + + public DCAENotificationService(){ + init(); + } + + private void init(){ + MRTopicMonitor monitor = new MRTopicMonitor(MONITOR_CONFIG_FILE, new DCAENotificationCallback()); + monitor.start(); + } +} diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java index 4712d0c..528805d 100644 --- a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java @@ -21,7 +21,6 @@ package org.onap.usecaseui.intentanalysis.adapters.dmaap; -import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; @@ -29,7 +28,6 @@ import io.vavr.collection.List; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -53,30 +51,36 @@ import reactor.core.publisher.Mono; */ public class MRTopicMonitor implements Runnable { - private final String name; + private final String configFileName; + private volatile boolean running = false; + private static Logger logger = LoggerFactory.getLogger(MRTopicMonitor.class); + private static int DEFAULT_TIMEOUT_MS_FETCH = 15000; + private MRConsumerWrapper consumerWrapper; + private NotificationCallback callback; /** * Constructor - * @param name name of topic subscriber in config + * + * @param configFileName name of topic subscriber file * @param callback callbackfunction for received message */ - public MRTopicMonitor(String name, NotificationCallback callback){ - this.name = name; + public MRTopicMonitor(String configFileName, NotificationCallback callback) { + this.configFileName = configFileName; this.callback = callback; } /** * Start the monitoring thread */ - public void start(){ + public void start() { logger.info("Starting Dmaap Bus Monitor"); try { - File configFile = Resources.getResourceAsFile("intentPolicy/modifycll.json"); + File configFile = Resources.getResourceAsFile("dmaapConfig/" + configFileName); String configBody = FileUtils.readFileToString(configFile, StandardCharsets.UTF_8); JsonObject jsonObject = JsonParser.parseString(configBody).getAsJsonObject(); consumerWrapper = buildConsumerWrapper(jsonObject); @@ -93,17 +97,16 @@ public class MRTopicMonitor implements Runnable { * Main loop that keep fetching and processing */ @Override - public void run(){ - while (running){ + public void run() { + while (running) { try { - logger.debug("Topic: {} getting new msg...", name); + logger.debug("Topic: {} getting new msg...", consumerWrapper.getTopicName()); List<JsonElement> dmaapMsgs = consumerWrapper.fetch(); - for (JsonElement msg : dmaapMsgs){ - logger.debug("Received message: {}" + - "\r\n and processing start", msg); + for (JsonElement msg : dmaapMsgs) { + logger.debug("Received message: {}" + "\r\n and processing start", msg); process(msg.toString()); } - } catch (IOException | RuntimeException e){ + } catch (IOException | RuntimeException e) { logger.error("fetchMessage encountered error: {}", e); } } @@ -113,7 +116,7 @@ public class MRTopicMonitor implements Runnable { /** * Stop the monitor */ - public void stop(){ + public void stop() { logger.info("{}: exiting", this); running = false; this.consumerWrapper.close(); @@ -123,7 +126,7 @@ public class MRTopicMonitor implements Runnable { private void process(String msg) { try { callback.activateCallBack(msg); - } catch (Exception e){ + } catch (Exception e) { logger.error("process message encountered error: {}", e); } } @@ -132,8 +135,8 @@ public class MRTopicMonitor implements Runnable { return this.consumerWrapper.fetch(); } - private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson ) - throws IllegalArgumentException { + private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson) + throws IllegalArgumentException { MRTopicParams topicParams = MRTopicParams.builder().buildFromConfigJson(topicParamsJson).build(); return new MRConsumerWrapper(topicParams); } @@ -148,6 +151,7 @@ public class MRTopicMonitor implements Runnable { * Name of the "protocol" property. */ protected static final String PROTOCOL_PROP = "Protocol"; + /** * Fetch timeout. */ @@ -160,11 +164,18 @@ public class MRTopicMonitor implements Runnable { private final int sleepTime; /** + * Topic Name to Subscribe + */ + @Getter + private String topicName; + + /** * Counted down when {@link #close()} is invoked. */ private final CountDownLatch closeCondition = new CountDownLatch(1); protected MessageRouterSubscriber subscriber; + protected MessageRouterSubscribeRequest request; /** @@ -173,6 +184,7 @@ public class MRTopicMonitor implements Runnable { * @param MRTopicParams parameters for the bus topic */ protected MRConsumerWrapper(MRTopicParams MRTopicParams) { + this.topicName = MRTopicParams.getTopicName(); this.fetchTimeout = MRTopicParams.getFetchTimeout(); if (this.fetchTimeout <= 0) { @@ -190,9 +202,10 @@ public class MRTopicMonitor implements Runnable { throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); } - try{ + try { this.subscriber = DmaapUtil.buildSubscriber(); - this.request = DmaapUtil.buildSubscriberRequest("aai_subscriber", MRTopicParams.getTopic()); + this.request = DmaapUtil.buildSubscriberRequest(topicName + "-Subscriber", MRTopicParams.getTopic(), + MRTopicParams.getConsumerGroup(), MRTopicParams.getConsumerInstance()); } catch (Exception e) { throw new IllegalArgumentException("Illegal MrConsumer parameters"); @@ -202,6 +215,7 @@ public class MRTopicMonitor implements Runnable { /** * Try fetch new message. But backoff for some sleepTime when connection fails. + * * @return * @throws IOException */ diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java index 7e2b3b6..6dcb932 100644 --- a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java @@ -176,6 +176,14 @@ public class MRTopicParams { return additionalProps != null; } + public String getTopicName(){ + if(null == this.topic){ + return ""; + } + String[] pmTopicSplit = this.topic.split("\\/"); + return pmTopicSplit[pmTopicSplit.length - 1]; + } + @NoArgsConstructor(access = AccessLevel.PRIVATE) public static class TopicParamsBuilder { @@ -326,8 +334,6 @@ public class MRTopicParams { if (topicUrl.startsWith("https")){ useHttps = true; } - String[] pmTopicSplit = topicUrl.split("\\/"); - topic = pmTopicSplit[pmTopicSplit.length - 1]; this.params.topic = topicUrl; this.params.servers = servers; @@ -370,6 +376,5 @@ public class MRTopicParams { this.params.serializationProvider = serializationProvider; return this; } - } } diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventEntity.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventEntity.java new file mode 100644 index 0000000..3dce969 --- /dev/null +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventEntity.java @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.usecaseui.intentanalysis.adapters.dmaap; + +public class NotificationEventEntity { + //The entity id , currently in CLL User Case, it is CLL id + private String id; + + //Assurance or modifyBW + private String operation; + + // it can be Failed/Success + private String result; + + private String reason; + +} diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventModel.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventModel.java new file mode 100644 index 0000000..2b4c44f --- /dev/null +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventModel.java @@ -0,0 +1,27 @@ +/* + * Copyright 2023 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.usecaseui.intentanalysis.adapters.dmaap; + +import java.util.Date; + +public class NotificationEventModel { + + private String source; + + private Date timestamp; + + private NotificationEventEntity entity; +} diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationCallback.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationCallback.java new file mode 100644 index 0000000..a93aac9 --- /dev/null +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationCallback.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.usecaseui.intentanalysis.adapters.policy.dmaap; + +import com.google.gson.Gson; +import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationCallback; +import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationEventModel; + +public class PolicyNotificationCallback implements NotificationCallback { + @Override + public void activateCallBack(String msg) { + NotificationEventModel event = (new Gson()).fromJson(msg, NotificationEventModel.class); + + //Todo analyze the event and Report to the Intent Flow; + } +} diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationService.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationService.java new file mode 100644 index 0000000..4ad32bd --- /dev/null +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationService.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.usecaseui.intentanalysis.adapters.policy.dmaap; + +import org.onap.usecaseui.intentanalysis.adapters.dmaap.MRTopicMonitor; +import org.springframework.stereotype.Service; + +@Service +public class PolicyNotificationService { + + //config of policy dmaap event subscribe + private static final String MONITOR_CONFIG_FILE = "policy_dmaap_config.json"; + + public PolicyNotificationService(){ + init(); + } + + private void init(){ + MRTopicMonitor monitor = new MRTopicMonitor(MONITOR_CONFIG_FILE, new PolicyNotificationCallback()); + monitor.start(); + } +} diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java index c6cbfce..a05c1be 100644 --- a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java +++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java @@ -46,14 +46,14 @@ public class DmaapUtil { return cut; } - public static MessageRouterSubscribeRequest buildSubscriberRequest(String name, String topicUrl){ + public static MessageRouterSubscribeRequest buildSubscriberRequest(String name, String topicUrl, String consumerGroup, String consumerId){ MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder() .name(name) .topicUrl(topicUrl) .build(); MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder() - .consumerGroup("1") - .consumerId("1") + .consumerGroup(consumerGroup) + .consumerId(consumerId) .sourceDefinition(sourceDefinition) .build(); |