From c616ee76ee72202bdf485de86b53a92837620c38 Mon Sep 17 00:00:00 2001 From: rameshiyer27 Date: Mon, 10 Jun 2024 17:08:04 +0100 Subject: Add Synchronization topic in acm runtime New sync topic for acm-ppnt synchronization Added publisher for the sync topic Refactor MessageDispatcherActivator for processing more than one topic parameter. Issue-ID: POLICY-5030 Change-Id: Id765b433beaf3f51fad9a9c66403a93d21c33797 Signed-off-by: zrrmmua --- .../messaging/MessageDispatcherActivator.java | 18 +++- .../acm/runtime/config/messaging/Publisher.java | 7 +- .../main/parameters/AcRuntimeParameterGroup.java | 6 +- .../clamp/acm/runtime/main/parameters/Topics.java | 34 +++++++ .../comm/AbstractParticipantAckPublisher.java | 18 +++- .../comm/AbstractParticipantPublisher.java | 19 +++- .../comm/ParticipantRestartPublisher.java | 4 +- .../supervision/comm/ParticipantSyncPublisher.java | 101 +++++++++++++++++++++ runtime-acm/src/main/resources/application.yaml | 13 ++- 9 files changed, 199 insertions(+), 21 deletions(-) create mode 100644 runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java create mode 100644 runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java (limited to 'runtime-acm/src/main') diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java index 0d9de205e..a3e55c3f7 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation. + * Copyright (C) 2021,2024 Nordix Foundation. * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -24,9 +24,12 @@ package org.onap.policy.clamp.acm.runtime.config.messaging; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; import lombok.Getter; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException; +import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; @@ -65,8 +68,14 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen topicSources = TopicEndpointManager.getManager() .addTopicSources(acRuntimeParameterGroup.getTopicParameterGroup().getTopicSources()); + var topics = acRuntimeParameterGroup.getTopics(); + msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + var topicMap = topicSinks.stream() + .collect(Collectors.toMap(Topic::getTopic, UnaryOperator.identity())); + + // @formatter:off addAction("Topic endpoint management", () -> TopicEndpointManager.getManager().start(), @@ -74,7 +83,8 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen publishers.forEach(publisher -> addAction("Publisher " + publisher.getClass().getSimpleName(), - () -> publisher.active(topicSinks), + () -> publisher.active(publisher.isDefaultTopic() ? topicMap.get(topics.getOperationTopic()) + : topicMap.get(topics.getSyncTopic())), publisher::stop)); listeners.forEach(listener -> @@ -90,7 +100,7 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen * Registers the dispatcher with the topic source(s). */ private void registerMsgDispatcher() { - for (final TopicSource source : topicSources) { + for (final var source : topicSources) { source.register(msgDispatcher); } } @@ -99,7 +109,7 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen * Unregisters the dispatcher from the topic source(s). */ private void unregisterMsgDispatcher() { - for (final TopicSource source : topicSources) { + for (final var source : topicSources) { source.unregister(msgDispatcher); } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java index a7acc47b3..a76a09d99 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation. + * Copyright (C) 2021,2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ package org.onap.policy.clamp.acm.runtime.config.messaging; import java.util.List; +import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; import org.onap.policy.common.endpoints.event.comm.TopicSink; /** @@ -28,7 +29,9 @@ import org.onap.policy.common.endpoints.event.comm.TopicSink; */ public interface Publisher { - void active(List topicSinks); + void active(TopicSink topicSink); void stop(); + + boolean isDefaultTopic(); } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java index a30b531a4..a0b6fe13e 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021,2023 Nordix Foundation. + * Copyright (C) 2021,2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,4 +50,8 @@ public class AcRuntimeParameterGroup { @Valid @NotNull private AcmParameters acmParameters = new AcmParameters(); + + @Valid + @NotNull + private Topics topics; } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java new file mode 100644 index 000000000..d485a24ba --- /dev/null +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.runtime.main.parameters; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import org.springframework.validation.annotation.Validated; + +@Getter +@Setter +@Validated +@AllArgsConstructor +public class Topics { + + private String operationTopic; + private String syncTopic; +} diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java index 246d1c13f..5014f7dc3 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java @@ -22,7 +22,9 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm; import jakarta.ws.rs.core.Response.Status; import java.util.List; +import java.util.Optional; import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher; +import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantAckMessage; import org.onap.policy.common.endpoints.event.comm.TopicSink; @@ -47,11 +49,8 @@ public abstract class AbstractParticipantAckPublisher topicSinks) { - if (topicSinks.size() != 1) { - throw new IllegalArgumentException("Topic Sink must be one"); - } - this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + public void active(TopicSink topicSink) { + this.topicSinkClient = new TopicSinkClient(topicSink); active = true; } @@ -59,4 +58,13 @@ public abstract class AbstractParticipantAckPublisher implements Publisher { private TopicSinkClient topicSinkClient; @@ -47,11 +50,8 @@ public abstract class AbstractParticipantPublisher @Override - public void active(List topicSinks) { - if (topicSinks.size() != 1) { - throw new IllegalArgumentException("Topic Sink must be one"); - } - this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + public void active(TopicSink topicSink) { + this.topicSinkClient = new TopicSinkClient(topicSink); active = true; } @@ -59,4 +59,13 @@ public abstract class AbstractParticipantPublisher public void stop() { active = false; } + + /** + * Is default topic. + * @return true if default + */ + @Override + public boolean isDefaultTopic() { + return true; + } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java index 50fa6d11f..4f28eab8e 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java @@ -86,12 +86,12 @@ public class ParticipantRestartPublisher extends AbstractParticipantPublisher prepareParticipantRestarting(UUID participantId, + protected List prepareParticipantRestarting(UUID participantId, AutomationCompositionDefinition acmDefinition) { var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(), acRuntimeParameterGroup.getAcmParameters().getToscaElementName()); - // list of entry entry filtered by participantId + // list of entry filtered by participantId List> elementList = new ArrayList<>(); Map supportedElementMap = new HashMap<>(); for (var elementEntry : acElements) { diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java new file mode 100644 index 000000000..ae7eda1ee --- /dev/null +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java @@ -0,0 +1,101 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.runtime.supervision.comm; + +import io.micrometer.core.annotation.Timed; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; +import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; +import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; +import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc; +import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync; +import org.onap.policy.clamp.models.acm.utils.AcmUtils; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + + +@Component +public class ParticipantSyncPublisher extends ParticipantRestartPublisher { + + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class); + + private final AcRuntimeParameterGroup acRuntimeParameterGroup; + + public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) { + super(acRuntimeParameterGroup); + this.acRuntimeParameterGroup = acRuntimeParameterGroup; + } + + + /** + * Send sync msg to Participant. + * + * @param participantId the ParticipantId + * @param acmDefinition the AutomationComposition Definition + * @param automationCompositions the list of automationCompositions + */ + @Override + @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") + public void send(UUID participantId, AutomationCompositionDefinition acmDefinition, + List automationCompositions) { + + var message = new ParticipantSync(); + message.setParticipantId(participantId); + message.setCompositionId(acmDefinition.getCompositionId()); + message.setMessageId(UUID.randomUUID()); + message.setTimestamp(Instant.now()); + message.setState(acmDefinition.getState()); + message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition)); + var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate()); + + for (var automationComposition : automationCompositions) { + var syncAc = new ParticipantRestartAc(); + syncAc.setAutomationCompositionId(automationComposition.getInstanceId()); + for (var element : automationComposition.getElements().values()) { + if (participantId.equals(element.getParticipantId())) { + var acElementSync = AcmUtils.createAcElementRestart(element); + acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment); + syncAc.getAcElementList().add(acElementSync); + } + } + message.getAutomationcompositionList().add(syncAc); + } + + LOGGER.debug("Participant Sync sent {}", message); + super.send(message); + } + + /** + * Is default topic. + * @return true if default + */ + @Override + public boolean isDefaultTopic() { + return false; + } + +} diff --git a/runtime-acm/src/main/resources/application.yaml b/runtime-acm/src/main/resources/application.yaml index d93418e5e..58e590b14 100644 --- a/runtime-acm/src/main/resources/application.yaml +++ b/runtime-acm/src/main/resources/application.yaml @@ -40,20 +40,29 @@ server: path: /error runtime: + topics: + operationTopic: policy-acruntime-participant + syncTopic: acm-ppnt-sync participantParameters: heartBeatMs: 20000 maxStatusWaitMs: 200000 topicParameterGroup: topicSources: - - topic: policy-acruntime-participant + topic: ${runtime.topics.operationTopic} servers: - ${topicServer:kafka:9092} topicCommInfrastructure: NOOP fetchTimeout: 15000 topicSinks: - - topic: policy-acruntime-participant + topic: ${runtime.topics.operationTopic} + servers: + - ${topicServer:kafka:9092} + topicCommInfrastructure: NOOP + + - + topic: ${runtime.topics.syncTopic} servers: - ${topicServer:kafka:9092} topicCommInfrastructure: NOOP -- cgit 1.2.3-korg