summaryrefslogtreecommitdiffstats
path: root/participant
diff options
context:
space:
mode:
authorrameshiyer27 <ramesh.murugan.iyer@est.tech>2024-06-11 11:23:44 +0100
committerrameshiyer27 <ramesh.murugan.iyer@est.tech>2024-06-11 13:46:56 +0100
commit5d48bd15e1d799ba4419a8b6d960a089335b9852 (patch)
treed13c4559267335c3cb91e3b05a34c5ae945a0fbe /participant
parentb174e37eb1a41e9997c9455edacc36667e0c5c1a (diff)
Add Sync topic for participant Intermediary
Add new sync topic config for Intermediary Add sync topic listener Refactor IntermediaryActivator for processing multiple topic source Issue-ID: POLICY-5030 Change-Id: Idce9839a85571a92048e589bd82ce33699add640 Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
Diffstat (limited to 'participant')
-rw-r--r--participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml12
-rw-r--r--participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml12
-rw-r--r--participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml12
-rw-r--r--participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml14
-rw-r--r--participant/participant-impl/participant-impl-kubernetes/src/test/java/org/onap/policy/clamp/acm/participant/kubernetes/parameters/CommonTestData.java24
-rw-r--r--participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml14
-rw-r--r--participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/acm/participant/policy/main/parameters/CommonTestData.java26
-rw-r--r--participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml12
-rw-r--r--participant/participant-impl/participant-impl-simulator/src/test/java/org/onap/policy/clamp/acm/participant/sim/comm/CommonTestData.java24
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java5
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java5
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java49
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java42
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java8
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java13
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java44
-rw-r--r--participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java4
-rw-r--r--participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java6
-rw-r--r--participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java21
20 files changed, 310 insertions, 43 deletions
diff --git a/participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml
index 011fafe50..18ffde61e 100644
--- a/participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml
@@ -23,18 +23,26 @@ a1pms:
participant:
intermediaryParameters:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c00
clampAutomationCompositionTopics:
topicSources:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+ fetchTimeout: 15000
+ - topic: ${participant.intermediaryparameters.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml
index ed68a4663..9e86d49ae 100644
--- a/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml
@@ -14,18 +14,26 @@ security:
enable-csrf: false
participant:
intermediaryParameters:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01
clampAutomationCompositionTopics:
topicSources:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+ fetchTimeout: 15000
+ - topic: ${participant.intermediaryparameters.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml
index aadad18ab..6ccd2fc9d 100644
--- a/participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml
@@ -22,18 +22,26 @@ security:
participant:
intermediaryParameters:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c04
clampAutomationCompositionTopics:
topicSources:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+ fetchTimeout: 15000
+ - topic: ${participant.intermediaryparameters.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml
index 969d77ab6..d66faee74 100644
--- a/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml
@@ -23,20 +23,26 @@ participant:
localChartDirectory: /home/policy/local-charts
infoFileName: CHART_INFO.json
intermediaryParameters:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c02
clampAutomationCompositionTopics:
topicSources:
- -
- topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+ fetchTimeout: 15000
+ - topic: ${participant.intermediaryparameters.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
- -
- topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-kubernetes/src/test/java/org/onap/policy/clamp/acm/participant/kubernetes/parameters/CommonTestData.java b/participant/participant-impl/participant-impl-kubernetes/src/test/java/org/onap/policy/clamp/acm/participant/kubernetes/parameters/CommonTestData.java
index b806cdbfd..3bb6009a8 100644
--- a/participant/participant-impl/participant-impl-kubernetes/src/test/java/org/onap/policy/clamp/acm/participant/kubernetes/parameters/CommonTestData.java
+++ b/participant/participant-impl/participant-impl-kubernetes/src/test/java/org/onap/policy/clamp/acm/participant/kubernetes/parameters/CommonTestData.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2023 Nordix Foundation.
+ * Copyright (C) 2021-2024 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
@@ -39,7 +40,8 @@ public class CommonTestData {
public static final String PARTICIPANT_GROUP_NAME = "AutomationCompositionParticipantGroup";
public static final String DESCRIPTION = "Participant description";
public static final long TIME_INTERVAL = 2000;
- public static final List<TopicParameters> TOPIC_PARAMS = List.of(getTopicParams());
+ public static final List<TopicParameters> SINK_TOPIC_PARAMS = List.of(getTopicParams());
+ public static final List<TopicParameters> SOURCE_TOPIC_PARAMS = List.of(getTopicParams(), getSyncTopicParams());
public static final Coder CODER = new StandardCoder();
private static final UUID AC_ID = UUID.randomUUID();
private static final String KEY_NAME =
@@ -110,6 +112,7 @@ public class CommonTestData {
map.put("participantId", getParticipantId());
map.put("clampAutomationCompositionTopics", getTopicParametersMap(false));
map.put("participantSupportedElementTypes", new ArrayList<>());
+ map.put("topics", new Topics("policy-acruntime-participant", "acm-ppnt-sync"));
}
return map;
@@ -133,8 +136,8 @@ public class CommonTestData {
public Map<String, Object> getTopicParametersMap(final boolean isEmpty) {
final Map<String, Object> map = new TreeMap<>();
if (!isEmpty) {
- map.put("topicSources", TOPIC_PARAMS);
- map.put("topicSinks", TOPIC_PARAMS);
+ map.put("topicSources", SOURCE_TOPIC_PARAMS);
+ map.put("topicSinks", SINK_TOPIC_PARAMS);
}
return map;
}
@@ -153,6 +156,19 @@ public class CommonTestData {
}
/**
+ * Returns sync topic parameters for test cases.
+ *
+ * @return topic parameters
+ */
+ public static TopicParameters getSyncTopicParams() {
+ final TopicParameters topicParams = new TopicParameters();
+ topicParams.setTopic("acm-ppnt-sync");
+ topicParams.setTopicCommInfrastructure("NOOP");
+ topicParams.setServers(List.of("localhost"));
+ return topicParams;
+ }
+
+ /**
* Get automation composition id.
* @return UUID automationCompositionId
*/
diff --git a/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml
index 6a357dd65..7a0ef8dd4 100644
--- a/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml
@@ -30,20 +30,26 @@ participant:
useHttps: true
allowSelfSignedCerts: true
intermediaryParameters:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c03
clampAutomationCompositionTopics:
topicSources:
- -
- topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+ fetchTimeout: 15000
+ - topic: ${participant.intermediaryparameters.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
- -
- topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/acm/participant/policy/main/parameters/CommonTestData.java b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/acm/participant/policy/main/parameters/CommonTestData.java
index 3b2550db5..555383b42 100644
--- a/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/acm/participant/policy/main/parameters/CommonTestData.java
+++ b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/acm/participant/policy/main/parameters/CommonTestData.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2023 Nordix Foundation.
+ * Copyright (C) 2021-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.CoderException;
@@ -37,7 +38,8 @@ public class CommonTestData {
public static final String PARTICIPANT_GROUP_NAME = "AutomationCompositionParticipantGroup";
public static final String DESCRIPTION = "Participant description";
public static final long TIME_INTERVAL = 2000;
- public static final List<TopicParameters> TOPIC_PARAMS = List.of(getTopicParams());
+ public static final List<TopicParameters> SINK_TOPIC_PARAMS = List.of(getSinkTopicParams());
+ public static final List<TopicParameters> SOURCE_TOPIC_PARAMS = List.of(getSinkTopicParams(), getSyncTopicParams());
public static final Coder CODER = new StandardCoder();
@@ -124,6 +126,7 @@ public class CommonTestData {
map.put("participantId", getParticipantId());
map.put("clampAutomationCompositionTopics", getTopicParametersMap(false));
map.put("participantSupportedElementTypes", new ArrayList<>());
+ map.put("topics", new Topics("policy-acruntime-participant", "acm-ppnt-sync"));
}
return map;
@@ -138,8 +141,8 @@ public class CommonTestData {
public Map<String, Object> getTopicParametersMap(final boolean isEmpty) {
final Map<String, Object> map = new TreeMap<>();
if (!isEmpty) {
- map.put("topicSources", TOPIC_PARAMS);
- map.put("topicSinks", TOPIC_PARAMS);
+ map.put("topicSources", SOURCE_TOPIC_PARAMS);
+ map.put("topicSinks", SINK_TOPIC_PARAMS);
}
return map;
}
@@ -149,7 +152,7 @@ public class CommonTestData {
*
* @return topic parameters
*/
- public static TopicParameters getTopicParams() {
+ public static TopicParameters getSinkTopicParams() {
final TopicParameters topicParams = new TopicParameters();
topicParams.setTopic("policy-acruntime-participant");
topicParams.setTopicCommInfrastructure("NOOP");
@@ -158,6 +161,19 @@ public class CommonTestData {
}
/**
+ * Returns sync topic parameters for test cases.
+ *
+ * @return topic parameters
+ */
+ public static TopicParameters getSyncTopicParams() {
+ final TopicParameters topicParams = new TopicParameters();
+ topicParams.setTopic("acm-ppnt-sync");
+ topicParams.setTopicCommInfrastructure("NOOP");
+ topicParams.setServers(List.of("localhost"));
+ return topicParams;
+ }
+
+ /**
* Returns participantId for test cases.
*
* @return participant Id
diff --git a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
index f3731d7a8..77a3ef443 100644
--- a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
@@ -14,18 +14,26 @@ security:
enable-csrf: false
participant:
intermediaryParameters:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
reportingTimeIntervalMs: 120000
description: Participant Description
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c90
clampAutomationCompositionTopics:
topicSources:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+ fetchTimeout: 15000
+ - topic: ${participant.intermediaryparameters.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
- - topic: policy-acruntime-participant
+ - topic: ${participant.intermediaryparameters.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-simulator/src/test/java/org/onap/policy/clamp/acm/participant/sim/comm/CommonTestData.java b/participant/participant-impl/participant-impl-simulator/src/test/java/org/onap/policy/clamp/acm/participant/sim/comm/CommonTestData.java
index 54f193039..5499931a2 100644
--- a/participant/participant-impl/participant-impl-simulator/src/test/java/org/onap/policy/clamp/acm/participant/sim/comm/CommonTestData.java
+++ b/participant/participant-impl/participant-impl-simulator/src/test/java/org/onap/policy/clamp/acm/participant/sim/comm/CommonTestData.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
import org.onap.policy.clamp.acm.participant.sim.parameters.ParticipantSimParameters;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
@@ -37,7 +38,8 @@ public class CommonTestData {
public static final Coder CODER = new StandardCoder();
public static final String DESCRIPTION = "Participant description";
public static final long TIME_INTERVAL = 2000;
- public static final List<TopicParameters> TOPIC_PARAMS = List.of(getTopicParams());
+ public static final List<TopicParameters> SINK_TOPIC_PARAMS = List.of(getSinkTopicParams());
+ public static final List<TopicParameters> SOURCE_TOPIC_PARAMS = List.of(getSinkTopicParams(), getSyncTopicParams());
/**
* Get ParticipantSimParameters.
@@ -77,6 +79,7 @@ public class CommonTestData {
map.put("participantId", getParticipantId());
map.put("clampAutomationCompositionTopics", getTopicParametersMap());
map.put("participantSupportedElementTypes", new ArrayList<>());
+ map.put("topics", new Topics("policy-acruntime-participant", "acm-ppnt-sync"));
return map;
}
@@ -88,8 +91,8 @@ public class CommonTestData {
*/
private static Map<String, Object> getTopicParametersMap() {
final Map<String, Object> map = new TreeMap<>();
- map.put("topicSources", TOPIC_PARAMS);
- map.put("topicSinks", TOPIC_PARAMS);
+ map.put("topicSources", SOURCE_TOPIC_PARAMS);
+ map.put("topicSinks", SINK_TOPIC_PARAMS);
return map;
}
@@ -98,7 +101,7 @@ public class CommonTestData {
*
* @return topic parameters
*/
- private static TopicParameters getTopicParams() {
+ private static TopicParameters getSinkTopicParams() {
final TopicParameters topicParams = new TopicParameters();
topicParams.setTopic("policy-acruntime-participant");
topicParams.setTopicCommInfrastructure("NOOP");
@@ -107,6 +110,19 @@ public class CommonTestData {
}
/**
+ * Returns sync topic parameters for test cases.
+ *
+ * @return topic parameters
+ */
+ private static TopicParameters getSyncTopicParams() {
+ final TopicParameters topicParams = new TopicParameters();
+ topicParams.setTopic("acm-ppnt-sync");
+ topicParams.setTopicCommInfrastructure("NOOP");
+ topicParams.setServers(List.of("localhost"));
+ return topicParams;
+ }
+
+ /**
* Returns participantId for test cases.
*
* @return participant Id
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java
index ec14e6cd7..4ed89dce8 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java
@@ -64,4 +64,9 @@ public abstract class ParticipantAckListener<T extends ParticipantAckMessage> ex
public ScoListener<T> getScoListener() {
return this;
}
+
+ @Override
+ public boolean isDefaultTopic() {
+ return true;
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java
index bf2892aa8..87eb5c8a1 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java
@@ -61,4 +61,9 @@ public abstract class ParticipantListener<T extends ParticipantMessage> extends
public ScoListener<T> getScoListener() {
return this;
}
+
+ @Override
+ public boolean isDefaultTopic() {
+ return true;
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java
new file mode 100644
index 000000000..0b359f98a
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java
@@ -0,0 +1,49 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.comm;
+
+import org.onap.policy.clamp.acm.participant.intermediary.handler.ParticipantHandler;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ParticipantSyncListener extends ParticipantListener<ParticipantSync> {
+
+ /**
+ * Constructs the object.
+ *
+ * @param participantHandler the handler for managing the state of the participant
+ */
+ public ParticipantSyncListener(ParticipantHandler participantHandler) {
+ super(ParticipantSync.class, participantHandler, participantHandler::handleParticipantSync);
+ }
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_SYNC_MSG.name();
+ }
+
+ @Override
+ public boolean isDefaultTopic() {
+ return false;
+ }
+}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
index a77d5242a..2c54a22dd 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021,2024 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -28,6 +28,7 @@ import java.util.Timer;
import java.util.TimerTask;
import lombok.Getter;
import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
@@ -55,6 +56,9 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
@Getter
private final MessageTypeDispatcher msgDispatcher;
+ @Getter
+ private final MessageTypeDispatcher syncMsgDispatcher;
+
/**
* Instantiate the activator for participant.
*
@@ -75,22 +79,32 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+
// @formatter:off
addAction("Topic endpoint management",
() -> TopicEndpointManager.getManager().start(),
() -> TopicEndpointManager.getManager().shutdown());
- listeners.forEach(listener ->
- addAction("Listener " + listener.getClass().getSimpleName(),
+ listeners.stream().filter(Listener::isDefaultTopic)
+ .forEach(listener -> addAction("Listener " + listener.getClass().getSimpleName(),
() -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
() -> msgDispatcher.unregister(listener.getType())));
+ listeners.stream().filter(l -> ! l.isDefaultTopic())
+ .forEach(listener -> addAction("Listener " + listener.getClass().getSimpleName(),
+ () -> syncMsgDispatcher.register(listener.getType(), listener.getScoListener()),
+ () -> syncMsgDispatcher.unregister(listener.getType())));
+
publishers.forEach(publisher ->
addAction("Publisher " + publisher.getClass().getSimpleName(),
() -> publisher.active(topicSinks),
publisher::stop));
- addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
+ var topics = parameters.getIntermediaryParameters().getTopics();
+
+ addAction("Topic Message Dispatcher", () -> this.registerMsgDispatcher(topics),
+ () -> this.unregisterMsgDispatcher(topics));
// @formatter:on
}
@@ -133,18 +147,26 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
/**
* Registers the dispatcher with the topic source(s).
*/
- private void registerMsgDispatcher() {
- for (final TopicSource source : topicSources) {
- source.register(msgDispatcher);
+ private void registerMsgDispatcher(Topics topics) {
+ for (final var source : topicSources) {
+ if (source.getTopic().equals(topics.getOperationTopic())) {
+ source.register(msgDispatcher);
+ } else if (source.getTopic().equals(topics.getSyncTopic())) {
+ source.register(syncMsgDispatcher);
+ }
}
}
/**
* Unregisters the dispatcher from the topic source(s).
*/
- private void unregisterMsgDispatcher() {
- for (final TopicSource source : topicSources) {
- source.unregister(msgDispatcher);
+ private void unregisterMsgDispatcher(Topics topics) {
+ for (final var source : topicSources) {
+ if (source.getTopic().equals(topics.getOperationTopic())) {
+ source.unregister(msgDispatcher);
+ } else if (source.getTopic().equals(topics.getSyncTopic())) {
+ source.unregister(syncMsgDispatcher);
+ }
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java
index 56ed55441..27585cf20 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021,2024 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -38,4 +38,10 @@ public interface Listener<T> {
* @return listener to register
*/
ScoListener<T> getScoListener();
+
+ /**
+ * Check if default topic.
+ * @return true if default topic
+ */
+ boolean isDefaultTopic();
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
index 54a05912a..7ac58ae6c 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
@@ -39,6 +39,7 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRe
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.PropertiesUpdate;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
import org.slf4j.Logger;
@@ -209,6 +210,18 @@ public class ParticipantHandler {
}
/**
+ * Handle a ParticipantSync message.
+ *
+ * @param participantSyncMsg the participantSync message
+ */
+ @Timed(value = "listener.participant_sync_msg", description = "PARTICIPANT_SYNC messages received")
+ public void handleParticipantSync(ParticipantSync participantSyncMsg) {
+ LOGGER.debug("ParticipantSync message received for participantId {}",
+ participantSyncMsg.getParticipantId());
+ acDefinitionHandler.handleParticipantRestart(participantSyncMsg);
+ }
+
+ /**
* Dispatch a heartbeat for this participant.
*/
public void sendHeartbeat() {
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
index 54774044b..1c36ad17f 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2023 Nordix Foundation.
+ * Copyright (C) 2021-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -60,4 +60,8 @@ public class ParticipantIntermediaryParameters {
@Valid
private List<ParticipantSupportedElementType> participantSupportedElementTypes;
+ @NotNull
+ @Valid
+ private Topics topics;
+
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java
new file mode 100644
index 000000000..ddf72052f
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java
@@ -0,0 +1,44 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.parameters;
+
+import jakarta.validation.Valid;
+import jakarta.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Class to hold topic names for operation and synchronization.
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+public class Topics {
+
+ @NotNull
+ @Valid
+ private String operationTopic;
+
+ @NotNull
+ @Valid
+ private String syncTopic;
+}
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java
index 10f9d4586..d7c97bd00 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java
@@ -82,6 +82,10 @@ class ParticipantCommTest {
assertEquals(ParticipantMessageType.PARTICIPANT_RESTART.name(),
participantRestartListener.getType());
+ var participantSyncListener = new ParticipantSyncListener(participantHandler);
+ assertEquals(ParticipantMessageType.PARTICIPANT_SYNC_MSG.name(),
+ participantSyncListener.getType());
+
var acMigrationListener = new AutomationCompositionMigrationListener(participantHandler);
assertEquals(ParticipantMessageType.AUTOMATION_COMPOSITION_MIGRATION.name(), acMigrationListener.getType());
}
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java
index ac9cbfad7..8868c733a 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java
@@ -34,8 +34,10 @@ import static org.mockito.Mockito.when;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantStatusReqListener;
+import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantSyncListener;
import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.common.utils.coder.StandardCoderObject;
@@ -59,10 +61,12 @@ class IntermediaryActivatorTest {
var listenerFirst = mock(ParticipantStatusReqListener.class);
when(listenerFirst.getType()).thenReturn(TOPIC_FIRST);
when(listenerFirst.getScoListener()).thenReturn(listenerFirst);
+ when(listenerFirst.isDefaultTopic()).thenReturn(true);
var listenerSecond = mock(ParticipantStatusReqListener.class);
when(listenerSecond.getType()).thenReturn(TOPIC_SECOND);
when(listenerSecond.getScoListener()).thenReturn(listenerSecond);
+ when(listenerSecond.isDefaultTopic()).thenReturn(false);
List<Listener<ParticipantStatusReq>> listeners = List.of(listenerFirst, listenerSecond);
@@ -84,7 +88,7 @@ class IntermediaryActivatorTest {
verify(listenerFirst, times(1)).onTopicEvent(any(), any(), any());
sco = CODER.decode("{messageType:" + TOPIC_SECOND + "}", StandardCoderObject.class);
- activator.getMsgDispatcher().onTopicEvent(null, "msg", sco);
+ activator.getSyncMsgDispatcher().onTopicEvent(null, "msg", sco);
verify(listenerSecond, times(1)).onTopicEvent(any(), any(), any());
activator.close();
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java
index 1536a0be0..e8cafa96f 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java
@@ -30,6 +30,7 @@ import java.util.TreeMap;
import java.util.UUID;
import org.onap.policy.clamp.acm.participant.intermediary.handler.DummyParticipantParameters;
import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantIntermediaryParameters;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
import org.onap.policy.clamp.models.acm.concepts.AcElementRestart;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -58,6 +59,7 @@ public class CommonTestData {
public static final String DESCRIPTION = "Participant description";
public static final long TIME_INTERVAL = 2000;
public static final List<TopicParameters> TOPIC_PARAMS = List.of(getTopicParams());
+ public static final List<TopicParameters> TOPIC_SOURCE_PARAMS = List.of(getTopicParams(), getSyncTopicParams());
public static final Coder CODER = new StandardCoder();
public static final UUID AC_ID_0 = UUID.randomUUID();
public static final UUID AC_ID_1 = UUID.randomUUID();
@@ -116,6 +118,7 @@ public class CommonTestData {
map.put("description", DESCRIPTION);
map.put("reportingTimeIntervalMs", TIME_INTERVAL);
map.put("clampAutomationCompositionTopics", getTopicParametersMap(false));
+ map.put("topics", getTopics());
var supportedElementType = new ParticipantSupportedElementType();
supportedElementType.setTypeName("org.onap.policy.clamp.acm.HttpAutomationCompositionElement");
supportedElementType.setTypeVersion("1.0.0");
@@ -133,7 +136,7 @@ public class CommonTestData {
public static Map<String, Object> getTopicParametersMap(final boolean isEmpty) {
final Map<String, Object> map = new TreeMap<>();
if (!isEmpty) {
- map.put("topicSources", TOPIC_PARAMS);
+ map.put("topicSources", TOPIC_SOURCE_PARAMS);
map.put("topicSinks", TOPIC_PARAMS);
}
return map;
@@ -153,6 +156,22 @@ public class CommonTestData {
}
/**
+ * Returns topic parameters for sync topic.
+ * @return topicparamaters
+ */
+ public static TopicParameters getSyncTopicParams() {
+ final var topicParams = new TopicParameters();
+ topicParams.setTopic("acm-ppnt-sync");
+ topicParams.setTopicCommInfrastructure("NOOP");
+ topicParams.setServers(List.of("localhost"));
+ return topicParams;
+ }
+
+ private static Topics getTopics() {
+ return new Topics("policy-acruntime-participant", "acm-ppnt-sync");
+ }
+
+ /**
* Returns participantId for test cases.
*
* @return participant Id