aboutsummaryrefslogtreecommitdiffstats
path: root/intentanalysis/src/main/java
diff options
context:
space:
mode:
authorKeguang He <hekeguang@chinamobile.com>2023-04-11 05:51:45 +0000
committerGerrit Code Review <gerrit@onap.org>2023-04-11 05:51:45 +0000
commita893668c5c913ccb6cc3189fdce3331f866cd700 (patch)
treef57edf494f45895269199f482c263e657012fb68 /intentanalysis/src/main/java
parent1d7b2ee4fa358da12c9a515c7d15a765d9fb7004 (diff)
parentef6560bbb9bfa6248bf2afead5a9d02338aaf02f (diff)
Merge "Add DCAE and Policy Dmaap Listener Service"
Diffstat (limited to 'intentanalysis/src/main/java')
-rw-r--r--intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationCallback.java30
-rw-r--r--intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationService.java36
-rw-r--r--intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java56
-rw-r--r--intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java11
-rw-r--r--intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventEntity.java30
-rw-r--r--intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventModel.java27
-rw-r--r--intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationCallback.java29
-rw-r--r--intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationService.java35
-rw-r--r--intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java6
9 files changed, 233 insertions, 27 deletions
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationCallback.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationCallback.java
new file mode 100644
index 0000000..158cf91
--- /dev/null
+++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationCallback.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dcae.dmaap;
+
+import com.google.gson.Gson;
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationCallback;
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationEventModel;
+
+public class DCAENotificationCallback implements NotificationCallback {
+
+ @Override
+ public void activateCallBack(String msg) {
+ NotificationEventModel event = (new Gson()).fromJson(msg, NotificationEventModel.class);
+
+ //Todo analyze the event and Report to the Intent Flow;
+ }
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationService.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationService.java
new file mode 100644
index 0000000..9ae8483
--- /dev/null
+++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationService.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dcae.dmaap;
+
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.MRTopicMonitor;
+import org.onap.usecaseui.intentanalysis.adapters.policy.dmaap.PolicyNotificationCallback;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DCAENotificationService {
+
+ //config of policy dmaap event subscribe
+ private static final String MONITOR_CONFIG_FILE = "dcae_dmaap_config.json";
+
+ public DCAENotificationService(){
+ init();
+ }
+
+ private void init(){
+ MRTopicMonitor monitor = new MRTopicMonitor(MONITOR_CONFIG_FILE, new DCAENotificationCallback());
+ monitor.start();
+ }
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java
index 4712d0c..528805d 100644
--- a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java
+++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java
@@ -21,7 +21,6 @@
package org.onap.usecaseui.intentanalysis.adapters.dmaap;
-import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
@@ -29,7 +28,6 @@ import io.vavr.collection.List;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -53,30 +51,36 @@ import reactor.core.publisher.Mono;
*/
public class MRTopicMonitor implements Runnable {
- private final String name;
+ private final String configFileName;
+
private volatile boolean running = false;
+
private static Logger logger = LoggerFactory.getLogger(MRTopicMonitor.class);
+
private static int DEFAULT_TIMEOUT_MS_FETCH = 15000;
+
private MRConsumerWrapper consumerWrapper;
+
private NotificationCallback callback;
/**
* Constructor
- * @param name name of topic subscriber in config
+ *
+ * @param configFileName name of topic subscriber file
* @param callback callbackfunction for received message
*/
- public MRTopicMonitor(String name, NotificationCallback callback){
- this.name = name;
+ public MRTopicMonitor(String configFileName, NotificationCallback callback) {
+ this.configFileName = configFileName;
this.callback = callback;
}
/**
* Start the monitoring thread
*/
- public void start(){
+ public void start() {
logger.info("Starting Dmaap Bus Monitor");
try {
- File configFile = Resources.getResourceAsFile("intentPolicy/modifycll.json");
+ File configFile = Resources.getResourceAsFile("dmaapConfig/" + configFileName);
String configBody = FileUtils.readFileToString(configFile, StandardCharsets.UTF_8);
JsonObject jsonObject = JsonParser.parseString(configBody).getAsJsonObject();
consumerWrapper = buildConsumerWrapper(jsonObject);
@@ -93,17 +97,16 @@ public class MRTopicMonitor implements Runnable {
* Main loop that keep fetching and processing
*/
@Override
- public void run(){
- while (running){
+ public void run() {
+ while (running) {
try {
- logger.debug("Topic: {} getting new msg...", name);
+ logger.debug("Topic: {} getting new msg...", consumerWrapper.getTopicName());
List<JsonElement> dmaapMsgs = consumerWrapper.fetch();
- for (JsonElement msg : dmaapMsgs){
- logger.debug("Received message: {}" +
- "\r\n and processing start", msg);
+ for (JsonElement msg : dmaapMsgs) {
+ logger.debug("Received message: {}" + "\r\n and processing start", msg);
process(msg.toString());
}
- } catch (IOException | RuntimeException e){
+ } catch (IOException | RuntimeException e) {
logger.error("fetchMessage encountered error: {}", e);
}
}
@@ -113,7 +116,7 @@ public class MRTopicMonitor implements Runnable {
/**
* Stop the monitor
*/
- public void stop(){
+ public void stop() {
logger.info("{}: exiting", this);
running = false;
this.consumerWrapper.close();
@@ -123,7 +126,7 @@ public class MRTopicMonitor implements Runnable {
private void process(String msg) {
try {
callback.activateCallBack(msg);
- } catch (Exception e){
+ } catch (Exception e) {
logger.error("process message encountered error: {}", e);
}
}
@@ -132,8 +135,8 @@ public class MRTopicMonitor implements Runnable {
return this.consumerWrapper.fetch();
}
- private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson )
- throws IllegalArgumentException {
+ private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson)
+ throws IllegalArgumentException {
MRTopicParams topicParams = MRTopicParams.builder().buildFromConfigJson(topicParamsJson).build();
return new MRConsumerWrapper(topicParams);
}
@@ -148,6 +151,7 @@ public class MRTopicMonitor implements Runnable {
* Name of the "protocol" property.
*/
protected static final String PROTOCOL_PROP = "Protocol";
+
/**
* Fetch timeout.
*/
@@ -160,11 +164,18 @@ public class MRTopicMonitor implements Runnable {
private final int sleepTime;
/**
+ * Topic Name to Subscribe
+ */
+ @Getter
+ private String topicName;
+
+ /**
* Counted down when {@link #close()} is invoked.
*/
private final CountDownLatch closeCondition = new CountDownLatch(1);
protected MessageRouterSubscriber subscriber;
+
protected MessageRouterSubscribeRequest request;
/**
@@ -173,6 +184,7 @@ public class MRTopicMonitor implements Runnable {
* @param MRTopicParams parameters for the bus topic
*/
protected MRConsumerWrapper(MRTopicParams MRTopicParams) {
+ this.topicName = MRTopicParams.getTopicName();
this.fetchTimeout = MRTopicParams.getFetchTimeout();
if (this.fetchTimeout <= 0) {
@@ -190,9 +202,10 @@ public class MRTopicMonitor implements Runnable {
throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
}
- try{
+ try {
this.subscriber = DmaapUtil.buildSubscriber();
- this.request = DmaapUtil.buildSubscriberRequest("aai_subscriber", MRTopicParams.getTopic());
+ this.request = DmaapUtil.buildSubscriberRequest(topicName + "-Subscriber", MRTopicParams.getTopic(),
+ MRTopicParams.getConsumerGroup(), MRTopicParams.getConsumerInstance());
} catch (Exception e) {
throw new IllegalArgumentException("Illegal MrConsumer parameters");
@@ -202,6 +215,7 @@ public class MRTopicMonitor implements Runnable {
/**
* Try fetch new message. But backoff for some sleepTime when connection fails.
+ *
* @return
* @throws IOException
*/
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java
index 7e2b3b6..6dcb932 100644
--- a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java
+++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java
@@ -176,6 +176,14 @@ public class MRTopicParams {
return additionalProps != null;
}
+ public String getTopicName(){
+ if(null == this.topic){
+ return "";
+ }
+ String[] pmTopicSplit = this.topic.split("\\/");
+ return pmTopicSplit[pmTopicSplit.length - 1];
+ }
+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public static class TopicParamsBuilder {
@@ -326,8 +334,6 @@ public class MRTopicParams {
if (topicUrl.startsWith("https")){
useHttps = true;
}
- String[] pmTopicSplit = topicUrl.split("\\/");
- topic = pmTopicSplit[pmTopicSplit.length - 1];
this.params.topic = topicUrl;
this.params.servers = servers;
@@ -370,6 +376,5 @@ public class MRTopicParams {
this.params.serializationProvider = serializationProvider;
return this;
}
-
}
}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventEntity.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventEntity.java
new file mode 100644
index 0000000..3dce969
--- /dev/null
+++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventEntity.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dmaap;
+
+public class NotificationEventEntity {
+ //The entity id , currently in CLL User Case, it is CLL id
+ private String id;
+
+ //Assurance or modifyBW
+ private String operation;
+
+ // it can be Failed/Success
+ private String result;
+
+ private String reason;
+
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventModel.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventModel.java
new file mode 100644
index 0000000..2b4c44f
--- /dev/null
+++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventModel.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dmaap;
+
+import java.util.Date;
+
+public class NotificationEventModel {
+
+ private String source;
+
+ private Date timestamp;
+
+ private NotificationEventEntity entity;
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationCallback.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationCallback.java
new file mode 100644
index 0000000..a93aac9
--- /dev/null
+++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.policy.dmaap;
+
+import com.google.gson.Gson;
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationCallback;
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationEventModel;
+
+public class PolicyNotificationCallback implements NotificationCallback {
+ @Override
+ public void activateCallBack(String msg) {
+ NotificationEventModel event = (new Gson()).fromJson(msg, NotificationEventModel.class);
+
+ //Todo analyze the event and Report to the Intent Flow;
+ }
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationService.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationService.java
new file mode 100644
index 0000000..4ad32bd
--- /dev/null
+++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationService.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.policy.dmaap;
+
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.MRTopicMonitor;
+import org.springframework.stereotype.Service;
+
+@Service
+public class PolicyNotificationService {
+
+ //config of policy dmaap event subscribe
+ private static final String MONITOR_CONFIG_FILE = "policy_dmaap_config.json";
+
+ public PolicyNotificationService(){
+ init();
+ }
+
+ private void init(){
+ MRTopicMonitor monitor = new MRTopicMonitor(MONITOR_CONFIG_FILE, new PolicyNotificationCallback());
+ monitor.start();
+ }
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java
index c6cbfce..a05c1be 100644
--- a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java
+++ b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java
@@ -46,14 +46,14 @@ public class DmaapUtil {
return cut;
}
- public static MessageRouterSubscribeRequest buildSubscriberRequest(String name, String topicUrl){
+ public static MessageRouterSubscribeRequest buildSubscriberRequest(String name, String topicUrl, String consumerGroup, String consumerId){
MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
.name(name)
.topicUrl(topicUrl)
.build();
MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder()
- .consumerGroup("1")
- .consumerId("1")
+ .consumerGroup(consumerGroup)
+ .consumerId(consumerId)
.sourceDefinition(sourceDefinition)
.build();