aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrameshiyer27 <ramesh.murugan.iyer@est.tech>2024-06-10 17:08:04 +0100
committerrameshiyer27 <ramesh.murugan.iyer@est.tech>2024-06-11 09:57:17 +0100
commitc616ee76ee72202bdf485de86b53a92837620c38 (patch)
treecf85106d3ac1f749616a90e160e1c90cc3c4f5fa
parenta48f784beca5e7aa189217c52cfa83452cf8fc47 (diff)
Add Synchronization topic in acm runtime
New sync topic for acm-ppnt synchronization Added publisher for the sync topic Refactor MessageDispatcherActivator for processing more than one topic parameter. Issue-ID: POLICY-5030 Change-Id: Id765b433beaf3f51fad9a9c66403a93d21c33797 Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
-rw-r--r--models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageType.java7
-rw-r--r--models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java12
-rw-r--r--models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java47
-rw-r--r--models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestartTest.java2
-rw-r--r--models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSyncTest.java86
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java18
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java7
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java6
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java34
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java18
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java19
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java4
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java101
-rw-r--r--runtime-acm/src/main/resources/application.yaml13
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivatorTest.java4
-rw-r--r--runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java47
-rw-r--r--runtime-acm/src/test/resources/application-prometheus-noauth.yaml12
-rw-r--r--runtime-acm/src/test/resources/application-test.yaml12
-rw-r--r--runtime-acm/src/test/resources/parameters/TestParameters.json16
19 files changed, 420 insertions, 45 deletions
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageType.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageType.java
index 29c2c01bd..e6e42e851 100644
--- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageType.java
+++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageType.java
@@ -110,5 +110,10 @@ public enum ParticipantMessageType {
* Used by acm runtime to migrate from a composition to another one in participants, triggers a
* AUTOMATION_COMPOSITION_MIGRATION message with result of AUTOMATION_COMPOSITION_STATE_CHANGE operation.
*/
- AUTOMATION_COMPOSITION_MIGRATION
+ AUTOMATION_COMPOSITION_MIGRATION,
+
+ /**
+ * Used by runtime to send composition and instances to sync participant replicas.
+ */
+ PARTICIPANT_SYNC_MSG
}
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java
index 103be6891..119cdf030 100644
--- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java
+++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2023,2024 Nordix Foundation.
+ * Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -41,7 +41,7 @@ public class ParticipantRestart extends ParticipantMessage {
// element definition
private List<ParticipantDefinition> participantDefinitionUpdates = new ArrayList<>();
- // automationcomposition instances list
+ // automation composition instances list
private List<ParticipantRestartAc> automationcompositionList = new ArrayList<>();
/**
@@ -52,6 +52,14 @@ public class ParticipantRestart extends ParticipantMessage {
}
/**
+ * Constructor with message type.
+ * @param messageType messageType
+ */
+ public ParticipantRestart(ParticipantMessageType messageType) {
+ super(messageType);
+ }
+
+ /**
* Constructs the object, making a deep copy.
*
* @param source source from which to copy
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java
new file mode 100644
index 000000000..33a730941
--- /dev/null
+++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.models.acm.messages.kafka.participant;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@Getter
+@Setter
+@ToString(callSuper = true)
+public class ParticipantSync extends ParticipantRestart {
+
+ /**
+ * Constructor.
+ */
+ public ParticipantSync() {
+ super(ParticipantMessageType.PARTICIPANT_SYNC_MSG);
+ }
+
+ /**
+ * Constructs the object, making a deep copy.
+ *
+ * @param source source from which to copy
+ */
+ public ParticipantSync(ParticipantSync source) {
+ super(source);
+ }
+}
diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestartTest.java b/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestartTest.java
index 3353de600..95b718e68 100644
--- a/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestartTest.java
+++ b/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestartTest.java
@@ -20,7 +20,6 @@
package org.onap.policy.clamp.models.acm.messages.kafka.participant;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.assertSerializable;
import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.removeVariableFields;
@@ -43,7 +42,6 @@ class ParticipantRestartTest {
@Test
void testCopyConstructor() throws CoderException {
- assertThatThrownBy(() -> new ParticipantRestart(null)).isInstanceOf(NullPointerException.class);
final var orig = new ParticipantRestart();
// verify with null values
diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSyncTest.java b/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSyncTest.java
new file mode 100644
index 000000000..970b94824
--- /dev/null
+++ b/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSyncTest.java
@@ -0,0 +1,86 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.models.acm.messages.kafka.participant;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.assertSerializable;
+import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.removeVariableFields;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.models.acm.concepts.AcElementRestart;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
+import org.onap.policy.clamp.models.acm.concepts.LockState;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
+import org.onap.policy.clamp.models.acm.utils.CommonTestData;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+
+
+
+public class ParticipantSyncTest {
+
+ @Test
+ void testCopyConstructor() throws CoderException {
+
+ final var orig = new ParticipantSync();
+ // verify with null values
+ assertEquals(removeVariableFields(orig.toString()),
+ removeVariableFields(new ParticipantSync(orig).toString()));
+
+ orig.setMessageId(UUID.randomUUID());
+ orig.setCompositionId(UUID.randomUUID());
+ orig.setTimestamp(Instant.ofEpochMilli(3000));
+ orig.setParticipantId(CommonTestData.getParticipantId());
+
+ var participantDefinitionUpdate = new ParticipantDefinition();
+ var type = new ToscaConceptIdentifier("id", "1.2.3");
+ var acDefinition = CommonTestData.getAcElementDefinition(type);
+ participantDefinitionUpdate.setAutomationCompositionElementDefinitionList(List.of(acDefinition));
+ orig.setParticipantDefinitionUpdates(List.of(participantDefinitionUpdate));
+
+ var acElement = new AcElementRestart();
+ acElement.setId(UUID.randomUUID());
+ var id = new ToscaConceptIdentifier("id", "1.2.3");
+ acElement.setDefinition(id);
+ acElement.setDeployState(DeployState.DEPLOYED);
+ acElement.setLockState(LockState.LOCKED);
+ acElement.setOperationalState("OperationalState");
+ acElement.setUseState("UseState");
+ acElement.setProperties(Map.of("key", "value"));
+ acElement.setOutProperties(Map.of("keyOut", "valueOut"));
+
+ var acRestart = new ParticipantRestartAc();
+ acRestart.setAcElementList(List.of(acElement));
+ acRestart.setAutomationCompositionId(UUID.randomUUID());
+
+ orig.setAutomationcompositionList(List.of(acRestart));
+
+ assertEquals(removeVariableFields(orig.toString()),
+ removeVariableFields(new ParticipantSync(orig).toString()));
+
+ assertSerializable(orig, ParticipantSync.class);
+ }
+}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java
index 0d9de205e..a3e55c3f7 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021,2024 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -24,9 +24,12 @@ package org.onap.policy.clamp.acm.runtime.config.messaging;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
import lombok.Getter;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
+import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
@@ -65,8 +68,14 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
topicSources = TopicEndpointManager.getManager()
.addTopicSources(acRuntimeParameterGroup.getTopicParameterGroup().getTopicSources());
+ var topics = acRuntimeParameterGroup.getTopics();
+
msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ var topicMap = topicSinks.stream()
+ .collect(Collectors.toMap(Topic::getTopic, UnaryOperator.identity()));
+
+
// @formatter:off
addAction("Topic endpoint management",
() -> TopicEndpointManager.getManager().start(),
@@ -74,7 +83,8 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
publishers.forEach(publisher ->
addAction("Publisher " + publisher.getClass().getSimpleName(),
- () -> publisher.active(topicSinks),
+ () -> publisher.active(publisher.isDefaultTopic() ? topicMap.get(topics.getOperationTopic())
+ : topicMap.get(topics.getSyncTopic())),
publisher::stop));
listeners.forEach(listener ->
@@ -90,7 +100,7 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
* Registers the dispatcher with the topic source(s).
*/
private void registerMsgDispatcher() {
- for (final TopicSource source : topicSources) {
+ for (final var source : topicSources) {
source.register(msgDispatcher);
}
}
@@ -99,7 +109,7 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
* Unregisters the dispatcher from the topic source(s).
*/
private void unregisterMsgDispatcher() {
- for (final TopicSource source : topicSources) {
+ for (final var source : topicSources) {
source.unregister(msgDispatcher);
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java
index a7acc47b3..a76a09d99 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021,2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
package org.onap.policy.clamp.acm.runtime.config.messaging;
import java.util.List;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
/**
@@ -28,7 +29,9 @@ import org.onap.policy.common.endpoints.event.comm.TopicSink;
*/
public interface Publisher {
- void active(List<TopicSink> topicSinks);
+ void active(TopicSink topicSink);
void stop();
+
+ boolean isDefaultTopic();
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java
index a30b531a4..a0b6fe13e 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021,2023 Nordix Foundation.
+ * Copyright (C) 2021,2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -50,4 +50,8 @@ public class AcRuntimeParameterGroup {
@Valid
@NotNull
private AcmParameters acmParameters = new AcmParameters();
+
+ @Valid
+ @NotNull
+ private Topics topics;
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java
new file mode 100644
index 000000000..d485a24ba
--- /dev/null
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.runtime.main.parameters;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.validation.annotation.Validated;
+
+@Getter
+@Setter
+@Validated
+@AllArgsConstructor
+public class Topics {
+
+ private String operationTopic;
+ private String syncTopic;
+}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java
index 246d1c13f..5014f7dc3 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java
@@ -22,7 +22,9 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
import jakarta.ws.rs.core.Response.Status;
import java.util.List;
+import java.util.Optional;
import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantAckMessage;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
@@ -47,11 +49,8 @@ public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMe
@Override
- public void active(List<TopicSink> topicSinks) {
- if (topicSinks.size() != 1) {
- throw new IllegalArgumentException("Topic Sink must be one");
- }
- this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ public void active(TopicSink topicSink) {
+ this.topicSinkClient = new TopicSinkClient(topicSink);
active = true;
}
@@ -59,4 +58,13 @@ public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMe
public void stop() {
active = false;
}
+
+ /**
+ * Is default topic.
+ * @return true if default
+ */
+ @Override
+ public boolean isDefaultTopic() {
+ return true;
+ }
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java
index d17cd7301..5afb7eba4 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java
@@ -22,12 +22,15 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
import jakarta.ws.rs.core.Response.Status;
import java.util.List;
+import java.util.Optional;
import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessage;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+
public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> implements Publisher {
private TopicSinkClient topicSinkClient;
@@ -47,11 +50,8 @@ public abstract class AbstractParticipantPublisher<E extends ParticipantMessage>
@Override
- public void active(List<TopicSink> topicSinks) {
- if (topicSinks.size() != 1) {
- throw new IllegalArgumentException("Topic Sink must be one");
- }
- this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ public void active(TopicSink topicSink) {
+ this.topicSinkClient = new TopicSinkClient(topicSink);
active = true;
}
@@ -59,4 +59,13 @@ public abstract class AbstractParticipantPublisher<E extends ParticipantMessage>
public void stop() {
active = false;
}
+
+ /**
+ * Is default topic.
+ * @return true if default
+ */
+ @Override
+ public boolean isDefaultTopic() {
+ return true;
+ }
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
index 50fa6d11f..4f28eab8e 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
@@ -86,12 +86,12 @@ public class ParticipantRestartPublisher extends AbstractParticipantPublisher<Pa
super.send(message);
}
- private List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId,
+ protected List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId,
AutomationCompositionDefinition acmDefinition) {
var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(),
acRuntimeParameterGroup.getAcmParameters().getToscaElementName());
- // list of entry entry filtered by participantId
+ // list of entry filtered by participantId
List<Entry<String, ToscaNodeTemplate>> elementList = new ArrayList<>();
Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>();
for (var elementEntry : acElements) {
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
new file mode 100644
index 000000000..ae7eda1ee
--- /dev/null
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
@@ -0,0 +1,101 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.runtime.supervision.comm;
+
+import io.micrometer.core.annotation.Timed;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
+import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
+import org.onap.policy.clamp.models.acm.utils.AcmUtils;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class ParticipantSyncPublisher extends ParticipantRestartPublisher {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class);
+
+ private final AcRuntimeParameterGroup acRuntimeParameterGroup;
+
+ public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) {
+ super(acRuntimeParameterGroup);
+ this.acRuntimeParameterGroup = acRuntimeParameterGroup;
+ }
+
+
+ /**
+ * Send sync msg to Participant.
+ *
+ * @param participantId the ParticipantId
+ * @param acmDefinition the AutomationComposition Definition
+ * @param automationCompositions the list of automationCompositions
+ */
+ @Override
+ @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+ public void send(UUID participantId, AutomationCompositionDefinition acmDefinition,
+ List<AutomationComposition> automationCompositions) {
+
+ var message = new ParticipantSync();
+ message.setParticipantId(participantId);
+ message.setCompositionId(acmDefinition.getCompositionId());
+ message.setMessageId(UUID.randomUUID());
+ message.setTimestamp(Instant.now());
+ message.setState(acmDefinition.getState());
+ message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition));
+ var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate());
+
+ for (var automationComposition : automationCompositions) {
+ var syncAc = new ParticipantRestartAc();
+ syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
+ for (var element : automationComposition.getElements().values()) {
+ if (participantId.equals(element.getParticipantId())) {
+ var acElementSync = AcmUtils.createAcElementRestart(element);
+ acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
+ syncAc.getAcElementList().add(acElementSync);
+ }
+ }
+ message.getAutomationcompositionList().add(syncAc);
+ }
+
+ LOGGER.debug("Participant Sync sent {}", message);
+ super.send(message);
+ }
+
+ /**
+ * Is default topic.
+ * @return true if default
+ */
+ @Override
+ public boolean isDefaultTopic() {
+ return false;
+ }
+
+}
diff --git a/runtime-acm/src/main/resources/application.yaml b/runtime-acm/src/main/resources/application.yaml
index d93418e5e..58e590b14 100644
--- a/runtime-acm/src/main/resources/application.yaml
+++ b/runtime-acm/src/main/resources/application.yaml
@@ -40,20 +40,29 @@ server:
path: /error
runtime:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
participantParameters:
heartBeatMs: 20000
maxStatusWaitMs: 200000
topicParameterGroup:
topicSources:
-
- topic: policy-acruntime-participant
+ topic: ${runtime.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
-
- topic: policy-acruntime-participant
+ topic: ${runtime.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+
+ -
+ topic: ${runtime.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivatorTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivatorTest.java
index 899e35f33..66595c89a 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivatorTest.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivatorTest.java
@@ -78,8 +78,8 @@ class MessageDispatcherActivatorTest {
// repeat start - should throw an exception
assertThatIllegalStateException().isThrownBy(activator::start);
assertTrue(activator.isAlive());
- verify(publisherFirst, times(1)).active(anyList());
- verify(publisherSecond, times(1)).active(anyList());
+ verify(publisherFirst, times(1)).active(any());
+ verify(publisherSecond, times(1)).active(any());
var sco = CODER.decode("{messageType:" + TOPIC_FIRST + "}", StandardCoderObject.class);
activator.getMsgDispatcher().onTopicEvent(null, "msg", sco);
diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java
index 295d2d781..31cd659b3 100644
--- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java
+++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java
@@ -36,6 +36,7 @@ import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionHandler;
@@ -74,7 +75,7 @@ class SupervisionMessagesTest {
void testSendParticipantRegisterAck() {
var acRegisterAckPublisher = new ParticipantRegisterAckPublisher();
var topicSink = mock(TopicSink.class);
- acRegisterAckPublisher.active(List.of(topicSink));
+ acRegisterAckPublisher.active(topicSink);
acRegisterAckPublisher.send(new ParticipantRegisterAck());
verify(topicSink).send(anyString());
acRegisterAckPublisher.stop();
@@ -100,7 +101,7 @@ class SupervisionMessagesTest {
void testSendParticipantDeregisterAck() {
var acDeregisterAckPublisher = new ParticipantDeregisterAckPublisher();
var topicSink = mock(TopicSink.class);
- acDeregisterAckPublisher.active(Collections.singletonList(topicSink));
+ acDeregisterAckPublisher.active(topicSink);
acDeregisterAckPublisher.send(new ParticipantDeregisterAck());
verify(topicSink).send(anyString());
acDeregisterAckPublisher.stop();
@@ -140,7 +141,7 @@ class SupervisionMessagesTest {
void testSendAutomationCompositionStateChangePublisher() {
var publisher = new AutomationCompositionStateChangePublisher();
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
publisher.send(getAutomationComposition(), 0, true);
verify(topicSink).send(anyString());
publisher.stop();
@@ -151,7 +152,7 @@ class SupervisionMessagesTest {
var publisher = new ParticipantPrimePublisher(mock(ParticipantProvider.class),
mock(AcmParticipantProvider.class), mock(AcRuntimeParameterGroup.class));
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
publisher.sendDepriming(UUID.randomUUID());
verify(topicSink).send(anyString());
}
@@ -173,7 +174,7 @@ class SupervisionMessagesTest {
var publisher = new ParticipantPrimePublisher(participantProvider, mock(AcmParticipantProvider.class),
CommonTestData.getTestParamaterGroup());
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
serviceTemplate.setName("Name");
serviceTemplate.setVersion("1.0.0");
@@ -192,7 +193,7 @@ class SupervisionMessagesTest {
void testParticipantStatusReqPublisher() {
var publisher = new ParticipantStatusReqPublisher();
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
publisher.send(CommonTestData.getParticipantId());
verify(topicSink).send(anyString());
}
@@ -201,7 +202,7 @@ class SupervisionMessagesTest {
void testParticipantRegisterAckPublisher() {
var publisher = new ParticipantRegisterAckPublisher();
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
publisher.send(UUID.randomUUID(), CommonTestData.getParticipantId());
verify(topicSink).send(anyString());
}
@@ -210,7 +211,7 @@ class SupervisionMessagesTest {
void testParticipantDeregisterAckPublisher() {
var publisher = new ParticipantDeregisterAckPublisher();
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
publisher.send(UUID.randomUUID());
verify(topicSink).send(anyString());
}
@@ -219,7 +220,7 @@ class SupervisionMessagesTest {
void testAcElementPropertiesPublisher() {
var publisher = new AcElementPropertiesPublisher();
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
var automationComposition =
InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud");
publisher.send(automationComposition);
@@ -230,7 +231,7 @@ class SupervisionMessagesTest {
void testAutomationCompositionMigrationPublisher() {
var publisher = new AutomationCompositionMigrationPublisher();
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
var automationComposition =
InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud");
publisher.send(automationComposition, UUID.randomUUID());
@@ -241,7 +242,31 @@ class SupervisionMessagesTest {
void testParticipantRestartPublisher() {
var publisher = new ParticipantRestartPublisher(CommonTestData.getTestParamaterGroup());
var topicSink = mock(TopicSink.class);
- publisher.active(List.of(topicSink));
+ publisher.active(topicSink);
+
+ var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
+ var acmDefinition = new AutomationCompositionDefinition();
+ acmDefinition.setCompositionId(UUID.randomUUID());
+ acmDefinition.setServiceTemplate(serviceTemplate);
+ var acElements = AcmUtils
+ .extractAcElementsFromServiceTemplate(serviceTemplate, "");
+ acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.PRIMED));
+
+ var automationComposition =
+ InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud");
+
+ var participantId = automationComposition.getElements().values().iterator().next().getParticipantId();
+ acmDefinition.getElementStateMap().values().iterator().next().setParticipantId(participantId);
+
+ publisher.send(participantId, acmDefinition, List.of(automationComposition));
+ verify(topicSink).send(anyString());
+ }
+
+ @Test
+ void testParticipantSyncPublisher() {
+ var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup());
+ var topicSink = mock(TopicSink.class);
+ publisher.active(topicSink);
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
var acmDefinition = new AutomationCompositionDefinition();
diff --git a/runtime-acm/src/test/resources/application-prometheus-noauth.yaml b/runtime-acm/src/test/resources/application-prometheus-noauth.yaml
index 1c71252f3..620e7534d 100644
--- a/runtime-acm/src/test/resources/application-prometheus-noauth.yaml
+++ b/runtime-acm/src/test/resources/application-prometheus-noauth.yaml
@@ -19,13 +19,16 @@ server:
context-path: /onap/policy/clamp/acm
runtime:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
participantParameters:
updateParameters:
maxRetryCount: 3
topicParameterGroup:
topicSources:
-
- topic: POLICY-ACRUNTIME-PARTICIPANT
+ topic: ${runtime.topics.operationTopic}
servers:
- localhost
topicCommInfrastructure: noop
@@ -35,7 +38,12 @@ runtime:
topicCommInfrastructure: noop
servers:
- localhost
- topic: POLICY-ACRUNTIME-PARTICIPANT
+ topic: ${runtime.topics.operationTopic}
+
+ - topic: ${runtime.topics.syncTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: noop
tracing:
enabled: true
diff --git a/runtime-acm/src/test/resources/application-test.yaml b/runtime-acm/src/test/resources/application-test.yaml
index 13b1f788a..5d616d529 100644
--- a/runtime-acm/src/test/resources/application-test.yaml
+++ b/runtime-acm/src/test/resources/application-test.yaml
@@ -19,13 +19,16 @@ server:
context-path: /onap/policy/clamp/acm
runtime:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
participantParameters:
updateParameters:
maxRetryCount: 3
topicParameterGroup:
topicSources:
-
- topic: policy-acruntime-participant
+ topic: ${runtime.topics.operationTopic}
servers:
- kafka:9092
topicCommInfrastructure: NOOP
@@ -35,7 +38,12 @@ runtime:
topicCommInfrastructure: NOOP
servers:
- kafka:9092
- topic: policy-acruntime-participant
+ topic: ${runtime.topics.operationTopic}
+ -
+ topic: ${runtime.topics.syncTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
acmParameters:
acElementName: org.onap.policy.clamp.acm.AutomationCompositionElement
acNodeType: org.onap.policy.clamp.acm.AutomationComposition
diff --git a/runtime-acm/src/test/resources/parameters/TestParameters.json b/runtime-acm/src/test/resources/parameters/TestParameters.json
index 8192b7214..1558abc2e 100644
--- a/runtime-acm/src/test/resources/parameters/TestParameters.json
+++ b/runtime-acm/src/test/resources/parameters/TestParameters.json
@@ -20,10 +20,15 @@
"databasePassword": "P01icY",
"persistenceUnit": "InstantiationTests"
},
+ "topics":{
+ "operationTopic": "policy-acruntime-participant",
+ "syncTopic": "acm-ppnt-sync"
+ },
"topicParameterGroup": {
+
"topicSources": [
{
- "topic": "POLICY-ACRUNTIME-PARTICIPANT",
+ "topic": "${topics.operationTopic}",
"servers": [
"localhost"
],
@@ -33,7 +38,14 @@
],
"topicSinks": [
{
- "topic": "POLICY-ACRUNTIME-PARTICIPANT",
+ "topic": "${topics.operationTopic}",
+ "servers": [
+ "localhost"
+ ],
+ "topicCommInfrastructure": "NOOP"
+ },
+ {
+ "topic": "${topics.syncTopic}",
"servers": [
"localhost"
],