summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrameshiyer27 <ramesh.murugan.iyer@est.tech>2024-06-11 11:23:44 +0100
committerrameshiyer27 <ramesh.murugan.iyer@est.tech>2024-06-11 13:46:56 +0100
commit5d48bd15e1d799ba4419a8b6d960a089335b9852 (patch)
treed13c4559267335c3cb91e3b05a34c5ae945a0fbe
parentb174e37eb1a41e9997c9455edacc36667e0c5c1a (diff)
Add Sync topic for participant Intermediary
Add new sync topic config for Intermediary Add sync topic listener Refactor IntermediaryActivator for processing multiple topic source Issue-ID: POLICY-5030 Change-Id: Idce9839a85571a92048e589bd82ce33699add640 Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
-rw-r--r--participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml12
-rw-r--r--participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml12
-rw-r--r--participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml12
-rw-r--r--participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml14
-rw-r--r--participant/participant-impl/participant-impl-kubernetes/src/test/java/org/onap/policy/clamp/acm/participant/kubernetes/parameters/CommonTestData.java24
-rw-r--r--participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml14
-rw-r--r--participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/acm/participant/policy/main/parameters/CommonTestData.java26
-rw-r--r--participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml12
-rw-r--r--participant/participant-impl/participant-impl-simulator/src/test/java/org/onap/policy/clamp/acm/participant/sim/comm/CommonTestData.java24
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java5
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java5
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java49
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java42
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java8
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java13
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java44
-rw-r--r--participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java4
-rw-r--r--participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java6
-rw-r--r--participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java21
20 files changed, 310 insertions, 43 deletions
diff --git a/participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml
index 011fafe50..18ffde61e 100644
--- a/participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml
@@ -23,18 +23,26 @@ a1pms:
participant:
intermediaryParameters:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c00
clampAutomationCompositionTopics:
topicSources:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+ fetchTimeout: 15000
+ - topic: ${participant.intermediaryparameters.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml
index ed68a4663..9e86d49ae 100644
--- a/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml
@@ -14,18 +14,26 @@ security:
enable-csrf: false
participant:
intermediaryParameters:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01
clampAutomationCompositionTopics:
topicSources:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+ fetchTimeout: 15000
+ - topic: ${participant.intermediaryparameters.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml
index aadad18ab..6ccd2fc9d 100644
--- a/participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml
@@ -22,18 +22,26 @@ security:
participant:
intermediaryParameters:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c04
clampAutomationCompositionTopics:
topicSources:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+ fetchTimeout: 15000
+ - topic: ${participant.intermediaryparameters.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml
index 969d77ab6..d66faee74 100644
--- a/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml
@@ -23,20 +23,26 @@ participant:
localChartDirectory: /home/policy/local-charts
infoFileName: CHART_INFO.json
intermediaryParameters:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c02
clampAutomationCompositionTopics:
topicSources:
- -
- topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+ fetchTimeout: 15000
+ - topic: ${participant.intermediaryparameters.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
- -
- topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-kubernetes/src/test/java/org/onap/policy/clamp/acm/participant/kubernetes/parameters/CommonTestData.java b/participant/participant-impl/participant-impl-kubernetes/src/test/java/org/onap/policy/clamp/acm/participant/kubernetes/parameters/CommonTestData.java
index b806cdbfd..3bb6009a8 100644
--- a/participant/participant-impl/participant-impl-kubernetes/src/test/java/org/onap/policy/clamp/acm/participant/kubernetes/parameters/CommonTestData.java
+++ b/participant/participant-impl/participant-impl-kubernetes/src/test/java/org/onap/policy/clamp/acm/participant/kubernetes/parameters/CommonTestData.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2023 Nordix Foundation.
+ * Copyright (C) 2021-2024 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
@@ -39,7 +40,8 @@ public class CommonTestData {
public static final String PARTICIPANT_GROUP_NAME = "AutomationCompositionParticipantGroup";
public static final String DESCRIPTION = "Participant description";
public static final long TIME_INTERVAL = 2000;
- public static final List<TopicParameters> TOPIC_PARAMS = List.of(getTopicParams());
+ public static final List<TopicParameters> SINK_TOPIC_PARAMS = List.of(getTopicParams());
+ public static final List<TopicParameters> SOURCE_TOPIC_PARAMS = List.of(getTopicParams(), getSyncTopicParams());
public static final Coder CODER = new StandardCoder();
private static final UUID AC_ID = UUID.randomUUID();
private static final String KEY_NAME =
@@ -110,6 +112,7 @@ public class CommonTestData {
map.put("participantId", getParticipantId());
map.put("clampAutomationCompositionTopics", getTopicParametersMap(false));
map.put("participantSupportedElementTypes", new ArrayList<>());
+ map.put("topics", new Topics("policy-acruntime-participant", "acm-ppnt-sync"));
}
return map;
@@ -133,8 +136,8 @@ public class CommonTestData {
public Map<String, Object> getTopicParametersMap(final boolean isEmpty) {
final Map<String, Object> map = new TreeMap<>();
if (!isEmpty) {
- map.put("topicSources", TOPIC_PARAMS);
- map.put("topicSinks", TOPIC_PARAMS);
+ map.put("topicSources", SOURCE_TOPIC_PARAMS);
+ map.put("topicSinks", SINK_TOPIC_PARAMS);
}
return map;
}
@@ -153,6 +156,19 @@ public class CommonTestData {
}
/**
+ * Returns sync topic parameters for test cases.
+ *
+ * @return topic parameters
+ */
+ public static TopicParameters getSyncTopicParams() {
+ final TopicParameters topicParams = new TopicParameters();
+ topicParams.setTopic("acm-ppnt-sync");
+ topicParams.setTopicCommInfrastructure("NOOP");
+ topicParams.setServers(List.of("localhost"));
+ return topicParams;
+ }
+
+ /**
* Get automation composition id.
* @return UUID automationCompositionId
*/
diff --git a/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml
index 6a357dd65..7a0ef8dd4 100644
--- a/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml
@@ -30,20 +30,26 @@ participant:
useHttps: true
allowSelfSignedCerts: true
intermediaryParameters:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c03
clampAutomationCompositionTopics:
topicSources:
- -
- topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+ fetchTimeout: 15000
+ - topic: ${participant.intermediaryparameters.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
- -
- topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/acm/participant/policy/main/parameters/CommonTestData.java b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/acm/participant/policy/main/parameters/CommonTestData.java
index 3b2550db5..555383b42 100644
--- a/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/acm/participant/policy/main/parameters/CommonTestData.java
+++ b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/acm/participant/policy/main/parameters/CommonTestData.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2023 Nordix Foundation.
+ * Copyright (C) 2021-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.CoderException;
@@ -37,7 +38,8 @@ public class CommonTestData {
public static final String PARTICIPANT_GROUP_NAME = "AutomationCompositionParticipantGroup";
public static final String DESCRIPTION = "Participant description";
public static final long TIME_INTERVAL = 2000;
- public static final List<TopicParameters> TOPIC_PARAMS = List.of(getTopicParams());
+ public static final List<TopicParameters> SINK_TOPIC_PARAMS = List.of(getSinkTopicParams());
+ public static final List<TopicParameters> SOURCE_TOPIC_PARAMS = List.of(getSinkTopicParams(), getSyncTopicParams());
public static final Coder CODER = new StandardCoder();
@@ -124,6 +126,7 @@ public class CommonTestData {
map.put("participantId", getParticipantId());
map.put("clampAutomationCompositionTopics", getTopicParametersMap(false));
map.put("participantSupportedElementTypes", new ArrayList<>());
+ map.put("topics", new Topics("policy-acruntime-participant", "acm-ppnt-sync"));
}
return map;
@@ -138,8 +141,8 @@ public class CommonTestData {
public Map<String, Object> getTopicParametersMap(final boolean isEmpty) {
final Map<String, Object> map = new TreeMap<>();
if (!isEmpty) {
- map.put("topicSources", TOPIC_PARAMS);
- map.put("topicSinks", TOPIC_PARAMS);
+ map.put("topicSources", SOURCE_TOPIC_PARAMS);
+ map.put("topicSinks", SINK_TOPIC_PARAMS);
}
return map;
}
@@ -149,7 +152,7 @@ public class CommonTestData {
*
* @return topic parameters
*/
- public static TopicParameters getTopicParams() {
+ public static TopicParameters getSinkTopicParams() {
final TopicParameters topicParams = new TopicParameters();
topicParams.setTopic("policy-acruntime-participant");
topicParams.setTopicCommInfrastructure("NOOP");
@@ -158,6 +161,19 @@ public class CommonTestData {
}
/**
+ * Returns sync topic parameters for test cases.
+ *
+ * @return topic parameters
+ */
+ public static TopicParameters getSyncTopicParams() {
+ final TopicParameters topicParams = new TopicParameters();
+ topicParams.setTopic("acm-ppnt-sync");
+ topicParams.setTopicCommInfrastructure("NOOP");
+ topicParams.setServers(List.of("localhost"));
+ return topicParams;
+ }
+
+ /**
* Returns participantId for test cases.
*
* @return participant Id
diff --git a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
index f3731d7a8..77a3ef443 100644
--- a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
@@ -14,18 +14,26 @@ security:
enable-csrf: false
participant:
intermediaryParameters:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c90
clampAutomationCompositionTopics:
topicSources:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+ fetchTimeout: 15000
+ - topic: ${participant.intermediaryparameters.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-simulator/src/test/java/org/onap/policy/clamp/acm/participant/sim/comm/CommonTestData.java b/participant/participant-impl/participant-impl-simulator/src/test/java/org/onap/policy/clamp/acm/participant/sim/comm/CommonTestData.java
index 54f193039..5499931a2 100644
--- a/participant/participant-impl/participant-impl-simulator/src/test/java/org/onap/policy/clamp/acm/participant/sim/comm/CommonTestData.java
+++ b/participant/participant-impl/participant-impl-simulator/src/test/java/org/onap/policy/clamp/acm/participant/sim/comm/CommonTestData.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
import org.onap.policy.clamp.acm.participant.sim.parameters.ParticipantSimParameters;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
@@ -37,7 +38,8 @@ public class CommonTestData {
public static final Coder CODER = new StandardCoder();
public static final String DESCRIPTION = "Participant description";
public static final long TIME_INTERVAL = 2000;
- public static final List<TopicParameters> TOPIC_PARAMS = List.of(getTopicParams());
+ public static final List<TopicParameters> SINK_TOPIC_PARAMS = List.of(getSinkTopicParams());
+ public static final List<TopicParameters> SOURCE_TOPIC_PARAMS = List.of(getSinkTopicParams(), getSyncTopicParams());
/**
* Get ParticipantSimParameters.
@@ -77,6 +79,7 @@ public class CommonTestData {
map.put("participantId", getParticipantId());
map.put("clampAutomationCompositionTopics", getTopicParametersMap());
map.put("participantSupportedElementTypes", new ArrayList<>());
+ map.put("topics", new Topics("policy-acruntime-participant", "acm-ppnt-sync"));
return map;
}
@@ -88,8 +91,8 @@ public class CommonTestData {
*/
private static Map<String, Object> getTopicParametersMap() {
final Map<String, Object> map = new TreeMap<>();
- map.put("topicSources", TOPIC_PARAMS);
- map.put("topicSinks", TOPIC_PARAMS);
+ map.put("topicSources", SOURCE_TOPIC_PARAMS);
+ map.put("topicSinks", SINK_TOPIC_PARAMS);
return map;
}
@@ -98,7 +101,7 @@ public class CommonTestData {
*
* @return topic parameters
*/
- private static TopicParameters getTopicParams() {
+ private static TopicParameters getSinkTopicParams() {
final TopicParameters topicParams = new TopicParameters();
topicParams.setTopic("policy-acruntime-participant");
topicParams.setTopicCommInfrastructure("NOOP");
@@ -107,6 +110,19 @@ public class CommonTestData {
}
/**
+ * Returns sync topic parameters for test cases.
+ *
+ * @return topic parameters
+ */
+ private static TopicParameters getSyncTopicParams() {
+ final TopicParameters topicParams = new TopicParameters();
+ topicParams.setTopic("acm-ppnt-sync");
+ topicParams.setTopicCommInfrastructure("NOOP");
+ topicParams.setServers(List.of("localhost"));
+ return topicParams;
+ }
+
+ /**
* Returns participantId for test cases.
*
* @return participant Id
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java
index ec14e6cd7..4ed89dce8 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java
@@ -64,4 +64,9 @@ public abstract class ParticipantAckListener<T extends ParticipantAckMessage> ex
public ScoListener<T> getScoListener() {
return this;
}
+
+ @Override
+ public boolean isDefaultTopic() {
+ return true;
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java
index bf2892aa8..87eb5c8a1 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java
@@ -61,4 +61,9 @@ public abstract class ParticipantListener<T extends ParticipantMessage> extends
public ScoListener<T> getScoListener() {
return this;
}
+
+ @Override
+ public boolean isDefaultTopic() {
+ return true;
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java
new file mode 100644
index 000000000..0b359f98a
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java
@@ -0,0 +1,49 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.comm;
+
+import org.onap.policy.clamp.acm.participant.intermediary.handler.ParticipantHandler;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ParticipantSyncListener extends ParticipantListener<ParticipantSync> {
+
+ /**
+ * Constructs the object.
+ *
+ * @param participantHandler the handler for managing the state of the participant
+ */
+ public ParticipantSyncListener(ParticipantHandler participantHandler) {
+ super(ParticipantSync.class, participantHandler, participantHandler::handleParticipantSync);
+ }
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_SYNC_MSG.name();
+ }
+
+ @Override
+ public boolean isDefaultTopic() {
+ return false;
+ }
+}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
index a77d5242a..2c54a22dd 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021,2024 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -28,6 +28,7 @@ import java.util.Timer;
import java.util.TimerTask;
import lombok.Getter;
import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
@@ -55,6 +56,9 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
@Getter
private final MessageTypeDispatcher msgDispatcher;
+ @Getter
+ private final MessageTypeDispatcher syncMsgDispatcher;
+
/**
* Instantiate the activator for participant.
*
@@ -75,22 +79,32 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+
// @formatter:off
addAction("Topic endpoint management",
() -> TopicEndpointManager.getManager().start(),
() -> TopicEndpointManager.getManager().shutdown());
- listeners.forEach(listener ->
- addAction("Listener " + listener.getClass().getSimpleName(),
+ listeners.stream().filter(Listener::isDefaultTopic)
+ .forEach(listener -> addAction("Listener " + listener.getClass().getSimpleName(),
() -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
() -> msgDispatcher.unregister(listener.getType())));
+ listeners.stream().filter(l -> ! l.isDefaultTopic())
+ .forEach(listener -> addAction("Listener " + listener.getClass().getSimpleName(),
+ () -> syncMsgDispatcher.register(listener.getType(), listener.getScoListener()),
+ () -> syncMsgDispatcher.unregister(listener.getType())));
+
publishers.forEach(publisher ->
addAction("Publisher " + publisher.getClass().getSimpleName(),
() -> publisher.active(topicSinks),
publisher::stop));
- addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
+ var topics = parameters.getIntermediaryParameters().getTopics();
+
+ addAction("Topic Message Dispatcher", () -> this.registerMsgDispatcher(topics),
+ () -> this.unregisterMsgDispatcher(topics));
// @formatter:on
}
@@ -133,18 +147,26 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
/**
* Registers the dispatcher with the topic source(s).
*/
- private void registerMsgDispatcher() {
- for (final TopicSource source : topicSources) {
- source.register(msgDispatcher);
+ private void registerMsgDispatcher(Topics topics) {
+ for (final var source : topicSources) {
+ if (source.getTopic().equals(topics.getOperationTopic())) {
+ source.register(msgDispatcher);
+ } else if (source.getTopic().equals(topics.getSyncTopic())) {
+ source.register(syncMsgDispatcher);
+ }
}
}
/**
* Unregisters the dispatcher from the topic source(s).
*/
- private void unregisterMsgDispatcher() {
- for (final TopicSource source : topicSources) {
- source.unregister(msgDispatcher);
+ private void unregisterMsgDispatcher(Topics topics) {
+ for (final var source : topicSources) {
+ if (source.getTopic().equals(topics.getOperationTopic())) {
+ source.unregister(msgDispatcher);
+ } else if (source.getTopic().equals(topics.getSyncTopic())) {
+ source.unregister(syncMsgDispatcher);
+ }
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java
index 56ed55441..27585cf20 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021,2024 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -38,4 +38,10 @@ public interface Listener<T> {
* @return listener to register
*/
ScoListener<T> getScoListener();
+
+ /**
+ * Check if default topic.
+ * @return true if default topic
+ */
+ boolean isDefaultTopic();
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
index 54a05912a..7ac58ae6c 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
@@ -39,6 +39,7 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRe
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.PropertiesUpdate;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
import org.slf4j.Logger;
@@ -209,6 +210,18 @@ public class ParticipantHandler {
}
/**
+ * Handle a ParticipantSync message.
+ *
+ * @param participantSyncMsg the participantSync message
+ */
+ @Timed(value = "listener.participant_sync_msg", description = "PARTICIPANT_SYNC messages received")
+ public void handleParticipantSync(ParticipantSync participantSyncMsg) {
+ LOGGER.debug("ParticipantSync message received for participantId {}",
+ participantSyncMsg.getParticipantId());
+ acDefinitionHandler.handleParticipantRestart(participantSyncMsg);
+ }
+
+ /**
* Dispatch a heartbeat for this participant.
*/
public void sendHeartbeat() {
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
index 54774044b..1c36ad17f 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2023 Nordix Foundation.
+ * Copyright (C) 2021-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -60,4 +60,8 @@ public class ParticipantIntermediaryParameters {
@Valid
private List<ParticipantSupportedElementType> participantSupportedElementTypes;
+ @NotNull
+ @Valid
+ private Topics topics;
+
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java
new file mode 100644
index 000000000..ddf72052f
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java
@@ -0,0 +1,44 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.parameters;
+
+import jakarta.validation.Valid;
+import jakarta.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Class to hold topic names for operation and synchronization.
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+public class Topics {
+
+ @NotNull
+ @Valid
+ private String operationTopic;
+
+ @NotNull
+ @Valid
+ private String syncTopic;
+}
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java
index 10f9d4586..d7c97bd00 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java
@@ -82,6 +82,10 @@ class ParticipantCommTest {
assertEquals(ParticipantMessageType.PARTICIPANT_RESTART.name(),
participantRestartListener.getType());
+ var participantSyncListener = new ParticipantSyncListener(participantHandler);
+ assertEquals(ParticipantMessageType.PARTICIPANT_SYNC_MSG.name(),
+ participantSyncListener.getType());
+
var acMigrationListener = new AutomationCompositionMigrationListener(participantHandler);
assertEquals(ParticipantMessageType.AUTOMATION_COMPOSITION_MIGRATION.name(), acMigrationListener.getType());
}
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java
index ac9cbfad7..8868c733a 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java
@@ -34,8 +34,10 @@ import static org.mockito.Mockito.when;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantStatusReqListener;
+import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantSyncListener;
import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.common.utils.coder.StandardCoderObject;
@@ -59,10 +61,12 @@ class IntermediaryActivatorTest {
var listenerFirst = mock(ParticipantStatusReqListener.class);
when(listenerFirst.getType()).thenReturn(TOPIC_FIRST);
when(listenerFirst.getScoListener()).thenReturn(listenerFirst);
+ when(listenerFirst.isDefaultTopic()).thenReturn(true);
var listenerSecond = mock(ParticipantStatusReqListener.class);
when(listenerSecond.getType()).thenReturn(TOPIC_SECOND);
when(listenerSecond.getScoListener()).thenReturn(listenerSecond);
+ when(listenerSecond.isDefaultTopic()).thenReturn(false);
List<Listener<ParticipantStatusReq>> listeners = List.of(listenerFirst, listenerSecond);
@@ -84,7 +88,7 @@ class IntermediaryActivatorTest {
verify(listenerFirst, times(1)).onTopicEvent(any(), any(), any());
sco = CODER.decode("{messageType:" + TOPIC_SECOND + "}", StandardCoderObject.class);
- activator.getMsgDispatcher().onTopicEvent(null, "msg", sco);
+ activator.getSyncMsgDispatcher().onTopicEvent(null, "msg", sco);
verify(listenerSecond, times(1)).onTopicEvent(any(), any(), any());
activator.close();
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java
index 1536a0be0..e8cafa96f 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java
@@ -30,6 +30,7 @@ import java.util.TreeMap;
import java.util.UUID;
import org.onap.policy.clamp.acm.participant.intermediary.handler.DummyParticipantParameters;
import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantIntermediaryParameters;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
import org.onap.policy.clamp.models.acm.concepts.AcElementRestart;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -58,6 +59,7 @@ public class CommonTestData {
public static final String DESCRIPTION = "Participant description";
public static final long TIME_INTERVAL = 2000;
public static final List<TopicParameters> TOPIC_PARAMS = List.of(getTopicParams());
+ public static final List<TopicParameters> TOPIC_SOURCE_PARAMS = List.of(getTopicParams(), getSyncTopicParams());
public static final Coder CODER = new StandardCoder();
public static final UUID AC_ID_0 = UUID.randomUUID();
public static final UUID AC_ID_1 = UUID.randomUUID();
@@ -116,6 +118,7 @@ public class CommonTestData {
map.put("description", DESCRIPTION);
map.put("reportingTimeIntervalMs", TIME_INTERVAL);
map.put("clampAutomationCompositionTopics", getTopicParametersMap(false));
+ map.put("topics", getTopics());
var supportedElementType = new ParticipantSupportedElementType();
supportedElementType.setTypeName("org.onap.policy.clamp.acm.HttpAutomationCompositionElement");
supportedElementType.setTypeVersion("1.0.0");
@@ -133,7 +136,7 @@ public class CommonTestData {
public static Map<String, Object> getTopicParametersMap(final boolean isEmpty) {
final Map<String, Object> map = new TreeMap<>();
if (!isEmpty) {
- map.put("topicSources", TOPIC_PARAMS);
+ map.put("topicSources", TOPIC_SOURCE_PARAMS);
map.put("topicSinks", TOPIC_PARAMS);
}
return map;
@@ -153,6 +156,22 @@ public class CommonTestData {
}
/**
+ * Returns topic parameters for sync topic.
+ * @return topicparamaters
+ */
+ public static TopicParameters getSyncTopicParams() {
+ final var topicParams = new TopicParameters();
+ topicParams.setTopic("acm-ppnt-sync");
+ topicParams.setTopicCommInfrastructure("NOOP");
+ topicParams.setServers(List.of("localhost"));
+ return topicParams;
+ }
+
+ private static Topics getTopics() {
+ return new Topics("policy-acruntime-participant", "acm-ppnt-sync");
+ }
+
+ /**
* Returns participantId for test cases.
*
* @return participant Id