diff options
author | rameshiyer27 <ramesh.murugan.iyer@est.tech> | 2024-06-10 17:08:04 +0100 |
---|---|---|
committer | rameshiyer27 <ramesh.murugan.iyer@est.tech> | 2024-06-11 09:57:17 +0100 |
commit | c616ee76ee72202bdf485de86b53a92837620c38 (patch) | |
tree | cf85106d3ac1f749616a90e160e1c90cc3c4f5fa | |
parent | a48f784beca5e7aa189217c52cfa83452cf8fc47 (diff) |
Add Synchronization topic in acm runtime
New sync topic for acm-ppnt synchronization
Added publisher for the sync topic
Refactor MessageDispatcherActivator for processing more than one topic
parameter.
Issue-ID: POLICY-5030
Change-Id: Id765b433beaf3f51fad9a9c66403a93d21c33797
Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
19 files changed, 420 insertions, 45 deletions
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageType.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageType.java index 29c2c01bd..e6e42e851 100644 --- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageType.java +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageType.java @@ -110,5 +110,10 @@ public enum ParticipantMessageType { * Used by acm runtime to migrate from a composition to another one in participants, triggers a * AUTOMATION_COMPOSITION_MIGRATION message with result of AUTOMATION_COMPOSITION_STATE_CHANGE operation. */ - AUTOMATION_COMPOSITION_MIGRATION + AUTOMATION_COMPOSITION_MIGRATION, + + /** + * Used by runtime to send composition and instances to sync participant replicas. + */ + PARTICIPANT_SYNC_MSG } diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java index 103be6891..119cdf030 100644 --- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2023,2024 Nordix Foundation. + * Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ public class ParticipantRestart extends ParticipantMessage { // element definition private List<ParticipantDefinition> participantDefinitionUpdates = new ArrayList<>(); - // automationcomposition instances list + // automation composition instances list private List<ParticipantRestartAc> automationcompositionList = new ArrayList<>(); /** @@ -52,6 +52,14 @@ public class ParticipantRestart extends ParticipantMessage { } /** + * Constructor with message type. + * @param messageType messageType + */ + public ParticipantRestart(ParticipantMessageType messageType) { + super(messageType); + } + + /** * Constructs the object, making a deep copy. * * @param source source from which to copy diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java new file mode 100644 index 000000000..33a730941 --- /dev/null +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.models.acm.messages.kafka.participant; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@Getter +@Setter +@ToString(callSuper = true) +public class ParticipantSync extends ParticipantRestart { + + /** + * Constructor. + */ + public ParticipantSync() { + super(ParticipantMessageType.PARTICIPANT_SYNC_MSG); + } + + /** + * Constructs the object, making a deep copy. + * + * @param source source from which to copy + */ + public ParticipantSync(ParticipantSync source) { + super(source); + } +} diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestartTest.java b/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestartTest.java index 3353de600..95b718e68 100644 --- a/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestartTest.java +++ b/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestartTest.java @@ -20,7 +20,6 @@ package org.onap.policy.clamp.models.acm.messages.kafka.participant; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.assertSerializable; import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.removeVariableFields; @@ -43,7 +42,6 @@ class ParticipantRestartTest { @Test void testCopyConstructor() throws CoderException { - assertThatThrownBy(() -> new ParticipantRestart(null)).isInstanceOf(NullPointerException.class); final var orig = new ParticipantRestart(); // verify with null values diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSyncTest.java b/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSyncTest.java new file mode 100644 index 000000000..970b94824 --- /dev/null +++ b/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSyncTest.java @@ -0,0 +1,86 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.models.acm.messages.kafka.participant; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.assertSerializable; +import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.removeVariableFields; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.onap.policy.clamp.models.acm.concepts.AcElementRestart; +import org.onap.policy.clamp.models.acm.concepts.DeployState; +import org.onap.policy.clamp.models.acm.concepts.LockState; +import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; +import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc; +import org.onap.policy.clamp.models.acm.utils.CommonTestData; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; + + + +public class ParticipantSyncTest { + + @Test + void testCopyConstructor() throws CoderException { + + final var orig = new ParticipantSync(); + // verify with null values + assertEquals(removeVariableFields(orig.toString()), + removeVariableFields(new ParticipantSync(orig).toString())); + + orig.setMessageId(UUID.randomUUID()); + orig.setCompositionId(UUID.randomUUID()); + orig.setTimestamp(Instant.ofEpochMilli(3000)); + orig.setParticipantId(CommonTestData.getParticipantId()); + + var participantDefinitionUpdate = new ParticipantDefinition(); + var type = new ToscaConceptIdentifier("id", "1.2.3"); + var acDefinition = CommonTestData.getAcElementDefinition(type); + participantDefinitionUpdate.setAutomationCompositionElementDefinitionList(List.of(acDefinition)); + orig.setParticipantDefinitionUpdates(List.of(participantDefinitionUpdate)); + + var acElement = new AcElementRestart(); + acElement.setId(UUID.randomUUID()); + var id = new ToscaConceptIdentifier("id", "1.2.3"); + acElement.setDefinition(id); + acElement.setDeployState(DeployState.DEPLOYED); + acElement.setLockState(LockState.LOCKED); + acElement.setOperationalState("OperationalState"); + acElement.setUseState("UseState"); + acElement.setProperties(Map.of("key", "value")); + acElement.setOutProperties(Map.of("keyOut", "valueOut")); + + var acRestart = new ParticipantRestartAc(); + acRestart.setAcElementList(List.of(acElement)); + acRestart.setAutomationCompositionId(UUID.randomUUID()); + + orig.setAutomationcompositionList(List.of(acRestart)); + + assertEquals(removeVariableFields(orig.toString()), + removeVariableFields(new ParticipantSync(orig).toString())); + + assertSerializable(orig, ParticipantSync.class); + } +} diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java index 0d9de205e..a3e55c3f7 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation. + * Copyright (C) 2021,2024 Nordix Foundation. * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -24,9 +24,12 @@ package org.onap.policy.clamp.acm.runtime.config.messaging; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; import lombok.Getter; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException; +import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; @@ -65,8 +68,14 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen topicSources = TopicEndpointManager.getManager() .addTopicSources(acRuntimeParameterGroup.getTopicParameterGroup().getTopicSources()); + var topics = acRuntimeParameterGroup.getTopics(); + msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + var topicMap = topicSinks.stream() + .collect(Collectors.toMap(Topic::getTopic, UnaryOperator.identity())); + + // @formatter:off addAction("Topic endpoint management", () -> TopicEndpointManager.getManager().start(), @@ -74,7 +83,8 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen publishers.forEach(publisher -> addAction("Publisher " + publisher.getClass().getSimpleName(), - () -> publisher.active(topicSinks), + () -> publisher.active(publisher.isDefaultTopic() ? topicMap.get(topics.getOperationTopic()) + : topicMap.get(topics.getSyncTopic())), publisher::stop)); listeners.forEach(listener -> @@ -90,7 +100,7 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen * Registers the dispatcher with the topic source(s). */ private void registerMsgDispatcher() { - for (final TopicSource source : topicSources) { + for (final var source : topicSources) { source.register(msgDispatcher); } } @@ -99,7 +109,7 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen * Unregisters the dispatcher from the topic source(s). */ private void unregisterMsgDispatcher() { - for (final TopicSource source : topicSources) { + for (final var source : topicSources) { source.unregister(msgDispatcher); } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java index a7acc47b3..a76a09d99 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation. + * Copyright (C) 2021,2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ package org.onap.policy.clamp.acm.runtime.config.messaging; import java.util.List; +import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; import org.onap.policy.common.endpoints.event.comm.TopicSink; /** @@ -28,7 +29,9 @@ import org.onap.policy.common.endpoints.event.comm.TopicSink; */ public interface Publisher { - void active(List<TopicSink> topicSinks); + void active(TopicSink topicSink); void stop(); + + boolean isDefaultTopic(); } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java index a30b531a4..a0b6fe13e 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021,2023 Nordix Foundation. + * Copyright (C) 2021,2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,4 +50,8 @@ public class AcRuntimeParameterGroup { @Valid @NotNull private AcmParameters acmParameters = new AcmParameters(); + + @Valid + @NotNull + private Topics topics; } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java new file mode 100644 index 000000000..d485a24ba --- /dev/null +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.runtime.main.parameters; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import org.springframework.validation.annotation.Validated; + +@Getter +@Setter +@Validated +@AllArgsConstructor +public class Topics { + + private String operationTopic; + private String syncTopic; +} diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java index 246d1c13f..5014f7dc3 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java @@ -22,7 +22,9 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm; import jakarta.ws.rs.core.Response.Status; import java.util.List; +import java.util.Optional; import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher; +import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantAckMessage; import org.onap.policy.common.endpoints.event.comm.TopicSink; @@ -47,11 +49,8 @@ public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMe @Override - public void active(List<TopicSink> topicSinks) { - if (topicSinks.size() != 1) { - throw new IllegalArgumentException("Topic Sink must be one"); - } - this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + public void active(TopicSink topicSink) { + this.topicSinkClient = new TopicSinkClient(topicSink); active = true; } @@ -59,4 +58,13 @@ public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMe public void stop() { active = false; } + + /** + * Is default topic. + * @return true if default + */ + @Override + public boolean isDefaultTopic() { + return true; + } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java index d17cd7301..5afb7eba4 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java @@ -22,12 +22,15 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm; import jakarta.ws.rs.core.Response.Status; import java.util.List; +import java.util.Optional; import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher; +import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessage; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; + public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> implements Publisher { private TopicSinkClient topicSinkClient; @@ -47,11 +50,8 @@ public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> @Override - public void active(List<TopicSink> topicSinks) { - if (topicSinks.size() != 1) { - throw new IllegalArgumentException("Topic Sink must be one"); - } - this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + public void active(TopicSink topicSink) { + this.topicSinkClient = new TopicSinkClient(topicSink); active = true; } @@ -59,4 +59,13 @@ public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> public void stop() { active = false; } + + /** + * Is default topic. + * @return true if default + */ + @Override + public boolean isDefaultTopic() { + return true; + } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java index 50fa6d11f..4f28eab8e 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java @@ -86,12 +86,12 @@ public class ParticipantRestartPublisher extends AbstractParticipantPublisher<Pa super.send(message); } - private List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId, + protected List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId, AutomationCompositionDefinition acmDefinition) { var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(), acRuntimeParameterGroup.getAcmParameters().getToscaElementName()); - // list of entry entry filtered by participantId + // list of entry filtered by participantId List<Entry<String, ToscaNodeTemplate>> elementList = new ArrayList<>(); Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>(); for (var elementEntry : acElements) { diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java new file mode 100644 index 000000000..ae7eda1ee --- /dev/null +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java @@ -0,0 +1,101 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.runtime.supervision.comm; + +import io.micrometer.core.annotation.Timed; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; +import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; +import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; +import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc; +import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync; +import org.onap.policy.clamp.models.acm.utils.AcmUtils; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + + +@Component +public class ParticipantSyncPublisher extends ParticipantRestartPublisher { + + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class); + + private final AcRuntimeParameterGroup acRuntimeParameterGroup; + + public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) { + super(acRuntimeParameterGroup); + this.acRuntimeParameterGroup = acRuntimeParameterGroup; + } + + + /** + * Send sync msg to Participant. + * + * @param participantId the ParticipantId + * @param acmDefinition the AutomationComposition Definition + * @param automationCompositions the list of automationCompositions + */ + @Override + @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") + public void send(UUID participantId, AutomationCompositionDefinition acmDefinition, + List<AutomationComposition> automationCompositions) { + + var message = new ParticipantSync(); + message.setParticipantId(participantId); + message.setCompositionId(acmDefinition.getCompositionId()); + message.setMessageId(UUID.randomUUID()); + message.setTimestamp(Instant.now()); + message.setState(acmDefinition.getState()); + message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition)); + var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate()); + + for (var automationComposition : automationCompositions) { + var syncAc = new ParticipantRestartAc(); + syncAc.setAutomationCompositionId(automationComposition.getInstanceId()); + for (var element : automationComposition.getElements().values()) { + if (participantId.equals(element.getParticipantId())) { + var acElementSync = AcmUtils.createAcElementRestart(element); + acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment); + syncAc.getAcElementList().add(acElementSync); + } + } + message.getAutomationcompositionList().add(syncAc); + } + + LOGGER.debug("Participant Sync sent {}", message); + super.send(message); + } + + /** + * Is default topic. + * @return true if default + */ + @Override + public boolean isDefaultTopic() { + return false; + } + +} diff --git a/runtime-acm/src/main/resources/application.yaml b/runtime-acm/src/main/resources/application.yaml index d93418e5e..58e590b14 100644 --- a/runtime-acm/src/main/resources/application.yaml +++ b/runtime-acm/src/main/resources/application.yaml @@ -40,20 +40,29 @@ server: path: /error runtime: + topics: + operationTopic: policy-acruntime-participant + syncTopic: acm-ppnt-sync participantParameters: heartBeatMs: 20000 maxStatusWaitMs: 200000 topicParameterGroup: topicSources: - - topic: policy-acruntime-participant + topic: ${runtime.topics.operationTopic} servers: - ${topicServer:kafka:9092} topicCommInfrastructure: NOOP fetchTimeout: 15000 topicSinks: - - topic: policy-acruntime-participant + topic: ${runtime.topics.operationTopic} + servers: + - ${topicServer:kafka:9092} + topicCommInfrastructure: NOOP + + - + topic: ${runtime.topics.syncTopic} servers: - ${topicServer:kafka:9092} topicCommInfrastructure: NOOP diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivatorTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivatorTest.java index 899e35f33..66595c89a 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivatorTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivatorTest.java @@ -78,8 +78,8 @@ class MessageDispatcherActivatorTest { // repeat start - should throw an exception assertThatIllegalStateException().isThrownBy(activator::start); assertTrue(activator.isAlive()); - verify(publisherFirst, times(1)).active(anyList()); - verify(publisherSecond, times(1)).active(anyList()); + verify(publisherFirst, times(1)).active(any()); + verify(publisherSecond, times(1)).active(any()); var sco = CODER.decode("{messageType:" + TOPIC_FIRST + "}", StandardCoderObject.class); activator.getMsgDispatcher().onTopicEvent(null, "msg", sco); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java index 295d2d781..31cd659b3 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java @@ -36,6 +36,7 @@ import java.util.UUID; import org.junit.jupiter.api.Test; import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; +import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionHandler; @@ -74,7 +75,7 @@ class SupervisionMessagesTest { void testSendParticipantRegisterAck() { var acRegisterAckPublisher = new ParticipantRegisterAckPublisher(); var topicSink = mock(TopicSink.class); - acRegisterAckPublisher.active(List.of(topicSink)); + acRegisterAckPublisher.active(topicSink); acRegisterAckPublisher.send(new ParticipantRegisterAck()); verify(topicSink).send(anyString()); acRegisterAckPublisher.stop(); @@ -100,7 +101,7 @@ class SupervisionMessagesTest { void testSendParticipantDeregisterAck() { var acDeregisterAckPublisher = new ParticipantDeregisterAckPublisher(); var topicSink = mock(TopicSink.class); - acDeregisterAckPublisher.active(Collections.singletonList(topicSink)); + acDeregisterAckPublisher.active(topicSink); acDeregisterAckPublisher.send(new ParticipantDeregisterAck()); verify(topicSink).send(anyString()); acDeregisterAckPublisher.stop(); @@ -140,7 +141,7 @@ class SupervisionMessagesTest { void testSendAutomationCompositionStateChangePublisher() { var publisher = new AutomationCompositionStateChangePublisher(); var topicSink = mock(TopicSink.class); - publisher.active(List.of(topicSink)); + publisher.active(topicSink); publisher.send(getAutomationComposition(), 0, true); verify(topicSink).send(anyString()); publisher.stop(); @@ -151,7 +152,7 @@ class SupervisionMessagesTest { var publisher = new ParticipantPrimePublisher(mock(ParticipantProvider.class), mock(AcmParticipantProvider.class), mock(AcRuntimeParameterGroup.class)); var topicSink = mock(TopicSink.class); - publisher.active(List.of(topicSink)); + publisher.active(topicSink); publisher.sendDepriming(UUID.randomUUID()); verify(topicSink).send(anyString()); } @@ -173,7 +174,7 @@ class SupervisionMessagesTest { var publisher = new ParticipantPrimePublisher(participantProvider, mock(AcmParticipantProvider.class), CommonTestData.getTestParamaterGroup()); var topicSink = mock(TopicSink.class); - publisher.active(List.of(topicSink)); + publisher.active(topicSink); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); serviceTemplate.setName("Name"); serviceTemplate.setVersion("1.0.0"); @@ -192,7 +193,7 @@ class SupervisionMessagesTest { void testParticipantStatusReqPublisher() { var publisher = new ParticipantStatusReqPublisher(); var topicSink = mock(TopicSink.class); - publisher.active(List.of(topicSink)); + publisher.active(topicSink); publisher.send(CommonTestData.getParticipantId()); verify(topicSink).send(anyString()); } @@ -201,7 +202,7 @@ class SupervisionMessagesTest { void testParticipantRegisterAckPublisher() { var publisher = new ParticipantRegisterAckPublisher(); var topicSink = mock(TopicSink.class); - publisher.active(List.of(topicSink)); + publisher.active(topicSink); publisher.send(UUID.randomUUID(), CommonTestData.getParticipantId()); verify(topicSink).send(anyString()); } @@ -210,7 +211,7 @@ class SupervisionMessagesTest { void testParticipantDeregisterAckPublisher() { var publisher = new ParticipantDeregisterAckPublisher(); var topicSink = mock(TopicSink.class); - publisher.active(List.of(topicSink)); + publisher.active(topicSink); publisher.send(UUID.randomUUID()); verify(topicSink).send(anyString()); } @@ -219,7 +220,7 @@ class SupervisionMessagesTest { void testAcElementPropertiesPublisher() { var publisher = new AcElementPropertiesPublisher(); var topicSink = mock(TopicSink.class); - publisher.active(List.of(topicSink)); + publisher.active(topicSink); var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud"); publisher.send(automationComposition); @@ -230,7 +231,7 @@ class SupervisionMessagesTest { void testAutomationCompositionMigrationPublisher() { var publisher = new AutomationCompositionMigrationPublisher(); var topicSink = mock(TopicSink.class); - publisher.active(List.of(topicSink)); + publisher.active(topicSink); var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud"); publisher.send(automationComposition, UUID.randomUUID()); @@ -241,7 +242,31 @@ class SupervisionMessagesTest { void testParticipantRestartPublisher() { var publisher = new ParticipantRestartPublisher(CommonTestData.getTestParamaterGroup()); var topicSink = mock(TopicSink.class); - publisher.active(List.of(topicSink)); + publisher.active(topicSink); + + var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); + var acmDefinition = new AutomationCompositionDefinition(); + acmDefinition.setCompositionId(UUID.randomUUID()); + acmDefinition.setServiceTemplate(serviceTemplate); + var acElements = AcmUtils + .extractAcElementsFromServiceTemplate(serviceTemplate, ""); + acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.PRIMED)); + + var automationComposition = + InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud"); + + var participantId = automationComposition.getElements().values().iterator().next().getParticipantId(); + acmDefinition.getElementStateMap().values().iterator().next().setParticipantId(participantId); + + publisher.send(participantId, acmDefinition, List.of(automationComposition)); + verify(topicSink).send(anyString()); + } + + @Test + void testParticipantSyncPublisher() { + var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup()); + var topicSink = mock(TopicSink.class); + publisher.active(topicSink); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); var acmDefinition = new AutomationCompositionDefinition(); diff --git a/runtime-acm/src/test/resources/application-prometheus-noauth.yaml b/runtime-acm/src/test/resources/application-prometheus-noauth.yaml index 1c71252f3..620e7534d 100644 --- a/runtime-acm/src/test/resources/application-prometheus-noauth.yaml +++ b/runtime-acm/src/test/resources/application-prometheus-noauth.yaml @@ -19,13 +19,16 @@ server: context-path: /onap/policy/clamp/acm runtime: + topics: + operationTopic: policy-acruntime-participant + syncTopic: acm-ppnt-sync participantParameters: updateParameters: maxRetryCount: 3 topicParameterGroup: topicSources: - - topic: POLICY-ACRUNTIME-PARTICIPANT + topic: ${runtime.topics.operationTopic} servers: - localhost topicCommInfrastructure: noop @@ -35,7 +38,12 @@ runtime: topicCommInfrastructure: noop servers: - localhost - topic: POLICY-ACRUNTIME-PARTICIPANT + topic: ${runtime.topics.operationTopic} + + - topic: ${runtime.topics.syncTopic} + servers: + - ${topicServer:kafka:9092} + topicCommInfrastructure: noop tracing: enabled: true diff --git a/runtime-acm/src/test/resources/application-test.yaml b/runtime-acm/src/test/resources/application-test.yaml index 13b1f788a..5d616d529 100644 --- a/runtime-acm/src/test/resources/application-test.yaml +++ b/runtime-acm/src/test/resources/application-test.yaml @@ -19,13 +19,16 @@ server: context-path: /onap/policy/clamp/acm runtime: + topics: + operationTopic: policy-acruntime-participant + syncTopic: acm-ppnt-sync participantParameters: updateParameters: maxRetryCount: 3 topicParameterGroup: topicSources: - - topic: policy-acruntime-participant + topic: ${runtime.topics.operationTopic} servers: - kafka:9092 topicCommInfrastructure: NOOP @@ -35,7 +38,12 @@ runtime: topicCommInfrastructure: NOOP servers: - kafka:9092 - topic: policy-acruntime-participant + topic: ${runtime.topics.operationTopic} + - + topic: ${runtime.topics.syncTopic} + servers: + - ${topicServer:kafka:9092} + topicCommInfrastructure: NOOP acmParameters: acElementName: org.onap.policy.clamp.acm.AutomationCompositionElement acNodeType: org.onap.policy.clamp.acm.AutomationComposition diff --git a/runtime-acm/src/test/resources/parameters/TestParameters.json b/runtime-acm/src/test/resources/parameters/TestParameters.json index 8192b7214..1558abc2e 100644 --- a/runtime-acm/src/test/resources/parameters/TestParameters.json +++ b/runtime-acm/src/test/resources/parameters/TestParameters.json @@ -20,10 +20,15 @@ "databasePassword": "P01icY", "persistenceUnit": "InstantiationTests" }, + "topics":{ + "operationTopic": "policy-acruntime-participant", + "syncTopic": "acm-ppnt-sync" + }, "topicParameterGroup": { + "topicSources": [ { - "topic": "POLICY-ACRUNTIME-PARTICIPANT", + "topic": "${topics.operationTopic}", "servers": [ "localhost" ], @@ -33,7 +38,14 @@ ], "topicSinks": [ { - "topic": "POLICY-ACRUNTIME-PARTICIPANT", + "topic": "${topics.operationTopic}", + "servers": [ + "localhost" + ], + "topicCommInfrastructure": "NOOP" + }, + { + "topic": "${topics.syncTopic}", "servers": [ "localhost" ], |