diff options
author | FrancescoFioraEst <francesco.fiora@est.tech> | 2022-12-19 14:43:11 +0000 |
---|---|---|
committer | FrancescoFioraEst <francesco.fiora@est.tech> | 2022-12-20 10:35:32 +0000 |
commit | c71863696d729286502f951a64c3d2193c6641e7 (patch) | |
tree | 024363ddde071c3f59d7468bdc9ddfd011d3701e /runtime-acm/src/main/java | |
parent | 66208f7d9ec65d859803ed347c3fdecd2d99022f (diff) |
Add compositionId into Messages between ACM and Participants
Issue-ID: POLICY-4489
Change-Id: I1fb8c04eacee040d5c944c522ca59a2a9a50376b
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'runtime-acm/src/main/java')
7 files changed, 118 insertions, 185 deletions
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java index 66a67a304..1d3b2992b 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java @@ -98,7 +98,7 @@ public class CommissioningProvider { serviceTemplate = acmDefinition.getServiceTemplate(); var participantList = participantProvider.getParticipants(); if (!participantList.isEmpty()) { - supervisionHandler.handleSendCommissionMessage(serviceTemplate.getName(), serviceTemplate.getVersion()); + supervisionHandler.handleSendCommissionMessage(acmDefinition); } return createCommissioningResponse(acmDefinition.getCompositionId(), serviceTemplate); } @@ -145,7 +145,7 @@ public class CommissioningProvider { } var participantList = participantProvider.getParticipants(); if (!participantList.isEmpty()) { - supervisionHandler.handleSendDeCommissionMessage(); + supervisionHandler.handleSendDeCommissionMessage(compositionId); } var serviceTemplate = acDefinitionProvider.deleteAcDefintion(compositionId); return createCommissioningResponse(compositionId, serviceTemplate); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java index ea851da81..8d1f98388 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation. + * Copyright (C) 2021-2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,14 +26,10 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.aspectj.lang.annotation.After; -import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; -import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantRegister; import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantStatus; -import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantUpdateAck; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; @@ -75,29 +71,6 @@ public class SupervisionAspect implements Closeable { executor.execute(() -> supervisionScanner.handleParticipantStatus(participantStatusMessage.getParticipantId())); } - /** - * Intercepts participant Register Message - * if there is a Commissioning starts an execution of handleParticipantRegister. - * - * @param participantRegisterMessage the ParticipantRegister message - * @param isCommissioning is Commissioning - */ - @AfterReturning( - value = "@annotation(MessageIntercept) && args(participantRegisterMessage,..)", - returning = "isCommissioning") - public void handleParticipantRegister(ParticipantRegister participantRegisterMessage, boolean isCommissioning) { - if (isCommissioning) { - executor.execute(() -> supervisionScanner.handleParticipantRegister(new ImmutablePair<>( - participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType()))); - } - } - - @Before("@annotation(MessageIntercept) && args(participantUpdateAckMessage,..)") - public void handleParticipantUpdateAck(ParticipantUpdateAck participantUpdateAckMessage) { - executor.execute(() -> supervisionScanner.handleParticipantUpdateAck(new ImmutablePair<>( - participantUpdateAckMessage.getParticipantId(), participantUpdateAckMessage.getParticipantType()))); - } - @Override public void close() throws IOException { executor.shutdown(); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java index de67360f8..9a80508e5 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java @@ -34,6 +34,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAck import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantUpdatePublisher; import org.onap.policy.clamp.common.acm.exception.AutomationCompositionException; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementAck; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionState; import org.onap.policy.clamp.models.acm.concepts.Participant; @@ -67,9 +68,9 @@ public class SupervisionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class); private static final String AUTOMATION_COMPOSITION_CANNOT_TRANSITION_FROM_STATE = - "Automation composition can't transition from state "; + "Automation composition can't transition from state "; private static final String AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE = - "Automation composition is already in state "; + "Automation composition is already in state "; private static final String TO_STATE = " to state "; private static final String AND_TRANSITIONING_TO_STATE = " and transitioning to state "; @@ -107,7 +108,7 @@ public class SupervisionHandler { */ @MessageIntercept @Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received") - public boolean handleParticipantMessage(ParticipantRegister participantRegisterMessage) { + public void handleParticipantMessage(ParticipantRegister participantRegisterMessage) { LOGGER.debug("Participant Register received {}", participantRegisterMessage); try { checkParticipant(participantRegisterMessage, ParticipantState.UNKNOWN, ParticipantHealthStatus.UNKNOWN); @@ -115,12 +116,11 @@ public class SupervisionHandler { LOGGER.warn("error saving participant {}", participantRegisterMessage.getParticipantId(), svExc); } - var isCommissioning = participantUpdatePublisher.sendCommissioning(null, null, - participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType()); + participantUpdatePublisher.sendCommissioning(participantRegisterMessage.getParticipantId(), + participantRegisterMessage.getParticipantType()); participantRegisterAckPublisher.send(participantRegisterMessage.getMessageId(), - participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType()); - return isCommissioning; + participantRegisterMessage.getParticipantId(), participantRegisterMessage.getParticipantType()); } /** @@ -134,8 +134,8 @@ public class SupervisionHandler { LOGGER.debug("Participant Deregister received {}", participantDeregisterMessage); try { var participantOpt = - participantProvider.findParticipant(participantDeregisterMessage.getParticipantId().getName(), - participantDeregisterMessage.getParticipantId().getVersion()); + participantProvider.findParticipant(participantDeregisterMessage.getParticipantId().getName(), + participantDeregisterMessage.getParticipantId().getVersion()); if (participantOpt.isPresent()) { var participant = participantOpt.get(); @@ -145,7 +145,7 @@ public class SupervisionHandler { } } catch (PfModelException pfme) { LOGGER.warn("Model exception occured with participant id {}", - participantDeregisterMessage.getParticipantId()); + participantDeregisterMessage.getParticipantId()); } participantDeregisterAckPublisher.send(participantDeregisterMessage.getMessageId()); @@ -162,8 +162,8 @@ public class SupervisionHandler { LOGGER.debug("Participant Update Ack received {}", participantUpdateAckMessage); try { var participantOpt = - participantProvider.findParticipant(participantUpdateAckMessage.getParticipantId().getName(), - participantUpdateAckMessage.getParticipantId().getVersion()); + participantProvider.findParticipant(participantUpdateAckMessage.getParticipantId().getName(), + participantUpdateAckMessage.getParticipantId().getVersion()); if (participantOpt.isPresent()) { var participant = participantOpt.get(); @@ -174,29 +174,28 @@ public class SupervisionHandler { } } catch (PfModelException pfme) { LOGGER.warn("Model exception occured with participant id {}", - participantUpdateAckMessage.getParticipantId()); + participantUpdateAckMessage.getParticipantId()); } } /** * Send commissioning update message to dmaap. * - * @param name the ToscaServiceTemplate name - * @param version the ToscaServiceTemplate version + * @param acmDefinition the AutomationComposition Definition */ - public void handleSendCommissionMessage(String name, String version) { - LOGGER.debug("Participant update message with serviveTemplate {} {} being sent to all participants", name, - version); - participantUpdatePublisher.sendComissioningBroadcast(name, version); + public void handleSendCommissionMessage(AutomationCompositionDefinition acmDefinition) { + LOGGER.debug("Participant update message with serviveTemplate {} being sent to all participants", + acmDefinition.getCompositionId()); + participantUpdatePublisher.sendComissioningBroadcast(acmDefinition); } /** * Send decommissioning update message to dmaap. * */ - public void handleSendDeCommissionMessage() { - LOGGER.debug("Participant update message being sent"); - participantUpdatePublisher.sendDecomisioning(); + public void handleSendDeCommissionMessage(UUID compositionId) { + LOGGER.debug("Participant update message being sent {}", compositionId); + participantUpdatePublisher.sendDecomisioning(compositionId); } /** @@ -205,8 +204,9 @@ public class SupervisionHandler { * @param automationCompositionAckMessage the AutomationCompositionAck message received from a participant */ @MessageIntercept - @Timed(value = "listener.automation_composition_update_ack", - description = "AUTOMATION_COMPOSITION_UPDATE_ACK messages received") + @Timed( + value = "listener.automation_composition_update_ack", + description = "AUTOMATION_COMPOSITION_UPDATE_ACK messages received") public void handleAutomationCompositionUpdateAckMessage(AutomationCompositionAck automationCompositionAckMessage) { LOGGER.debug("AutomationComposition Update Ack message received {}", automationCompositionAckMessage); setAcElementStateInDb(automationCompositionAckMessage); @@ -218,10 +218,11 @@ public class SupervisionHandler { * @param automationCompositionAckMessage the AutomationCompositionAck message received from a participant */ @MessageIntercept - @Timed(value = "listener.automation_composition_statechange_ack", - description = "AUTOMATION_COMPOSITION_STATECHANGE_ACK messages received") + @Timed( + value = "listener.automation_composition_statechange_ack", + description = "AUTOMATION_COMPOSITION_STATECHANGE_ACK messages received") public void handleAutomationCompositionStateChangeAckMessage( - AutomationCompositionAck automationCompositionAckMessage) { + AutomationCompositionAck automationCompositionAckMessage) { LOGGER.debug("AutomationComposition StateChange Ack message received {}", automationCompositionAckMessage); setAcElementStateInDb(automationCompositionAckMessage); } @@ -245,7 +246,7 @@ public class SupervisionHandler { } private boolean updateState(AutomationComposition automationComposition, - Set<Map.Entry<UUID, AutomationCompositionElementAck>> automationCompositionResultSet) { + Set<Map.Entry<UUID, AutomationCompositionElementAck>> automationCompositionResultSet) { var updated = false; for (var acElementAck : automationCompositionResultSet) { var element = automationComposition.getElements().get(acElementAck.getKey()); @@ -262,9 +263,9 @@ public class SupervisionHandler { if (acElements != null) { Boolean primedFlag = true; var checkOpt = automationComposition.getElements().values().stream() - .filter(acElement -> (!acElement.getState().equals(AutomationCompositionState.PASSIVE) - || !acElement.getState().equals(AutomationCompositionState.RUNNING))) - .findAny(); + .filter(acElement -> (!acElement.getState().equals(AutomationCompositionState.PASSIVE) + || !acElement.getState().equals(AutomationCompositionState.RUNNING))) + .findAny(); if (checkOpt.isEmpty()) { primedFlag = false; } @@ -283,7 +284,7 @@ public class SupervisionHandler { * @throws AutomationCompositionException on supervision errors */ public void triggerAutomationCompositionSupervision(AutomationComposition automationComposition) - throws AutomationCompositionException { + throws AutomationCompositionException { switch (automationComposition.getOrderedState()) { case UNINITIALISED: superviseAutomationCompositionUninitialization(automationComposition); @@ -299,8 +300,8 @@ public class SupervisionHandler { default: exceptionOccured(Response.Status.NOT_ACCEPTABLE, - "A automation composition cannot be commanded to go into state " - + automationComposition.getOrderedState().name()); + "A automation composition cannot be commanded to go into state " + + automationComposition.getOrderedState().name()); } } @@ -313,39 +314,39 @@ public class SupervisionHandler { * @throws AutomationCompositionException on supervision errors */ private void superviseAutomationCompositionUninitialization(AutomationComposition automationComposition) - throws AutomationCompositionException { + throws AutomationCompositionException { switch (automationComposition.getState()) { case UNINITIALISED: exceptionOccured(Response.Status.NOT_ACCEPTABLE, - AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()); + AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()); break; case UNINITIALISED2PASSIVE: case PASSIVE: automationComposition.setState(AutomationCompositionState.PASSIVE2UNINITIALISED); automationCompositionStateChangePublisher.send(automationComposition, - getFirstStartPhase(automationComposition)); + getFirstStartPhase(automationComposition)); break; case PASSIVE2UNINITIALISED: exceptionOccured(Response.Status.NOT_ACCEPTABLE, - AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name() - + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState()); + AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name() + + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState()); break; default: exceptionOccured(Response.Status.NOT_ACCEPTABLE, AUTOMATION_COMPOSITION_CANNOT_TRANSITION_FROM_STATE - + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState()); + + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState()); break; } } private void superviseAutomationCompositionPassivation(AutomationComposition automationComposition) - throws AutomationCompositionException { + throws AutomationCompositionException { switch (automationComposition.getState()) { case PASSIVE: exceptionOccured(Response.Status.NOT_ACCEPTABLE, - AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()); + AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()); break; case UNINITIALISED: automationComposition.setState(AutomationCompositionState.UNINITIALISED2PASSIVE); @@ -355,46 +356,46 @@ public class SupervisionHandler { case UNINITIALISED2PASSIVE: case RUNNING2PASSIVE: exceptionOccured(Response.Status.NOT_ACCEPTABLE, - AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name() - + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState()); + AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name() + + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState()); break; case RUNNING: automationComposition.setState(AutomationCompositionState.RUNNING2PASSIVE); automationCompositionStateChangePublisher.send(automationComposition, - getFirstStartPhase(automationComposition)); + getFirstStartPhase(automationComposition)); break; default: exceptionOccured(Response.Status.NOT_ACCEPTABLE, AUTOMATION_COMPOSITION_CANNOT_TRANSITION_FROM_STATE - + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState()); + + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState()); break; } } private void superviseAutomationCompositionActivation(AutomationComposition automationComposition) - throws AutomationCompositionException { + throws AutomationCompositionException { switch (automationComposition.getState()) { case RUNNING: exceptionOccured(Response.Status.NOT_ACCEPTABLE, - AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()); + AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name()); break; case PASSIVE2RUNNING: exceptionOccured(Response.Status.NOT_ACCEPTABLE, - AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name() - + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState()); + AUTOMATION_COMPOSITION_IS_ALREADY_IN_STATE + automationComposition.getState().name() + + AND_TRANSITIONING_TO_STATE + automationComposition.getOrderedState()); break; case PASSIVE: automationComposition.setState(AutomationCompositionState.PASSIVE2RUNNING); automationCompositionStateChangePublisher.send(automationComposition, - getFirstStartPhase(automationComposition)); + getFirstStartPhase(automationComposition)); break; default: exceptionOccured(Response.Status.NOT_ACCEPTABLE, AUTOMATION_COMPOSITION_CANNOT_TRANSITION_FROM_STATE - + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState()); + + automationComposition.getState().name() + TO_STATE + automationComposition.getOrderedState()); break; } } @@ -405,12 +406,12 @@ public class SupervisionHandler { } private void checkParticipant(ParticipantMessage participantMessage, ParticipantState participantState, - ParticipantHealthStatus healthStatus) throws AutomationCompositionException, PfModelException { + ParticipantHealthStatus healthStatus) throws AutomationCompositionException, PfModelException { if (participantMessage.getParticipantId() == null) { exceptionOccured(Response.Status.NOT_FOUND, "Participant ID on PARTICIPANT_STATUS message is null"); } var participantOpt = participantProvider.findParticipant(participantMessage.getParticipantId().getName(), - participantMessage.getParticipantId().getVersion()); + participantMessage.getParticipantId().getVersion()); if (participantOpt.isEmpty()) { var participant = new Participant(); @@ -432,10 +433,10 @@ public class SupervisionHandler { } private void superviseParticipant(ParticipantStatus participantStatusMessage) - throws PfModelException, AutomationCompositionException { + throws PfModelException, AutomationCompositionException { checkParticipant(participantStatusMessage, participantStatusMessage.getState(), - participantStatusMessage.getHealthStatus()); + participantStatusMessage.getHealthStatus()); } private void exceptionOccured(Response.Status status, String reason) throws AutomationCompositionException { diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java index 129569b6b..a71f49c10 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java @@ -24,14 +24,12 @@ package org.onap.policy.clamp.acm.runtime.supervision; import java.util.HashMap; import java.util.Map; -import org.apache.commons.lang3.tuple.Pair; +import java.util.UUID; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionUpdatePublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantStatusReqPublisher; -import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantUpdatePublisher; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; -import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionState; import org.onap.policy.clamp.models.acm.concepts.Participant; import org.onap.policy.clamp.models.acm.concepts.ParticipantHealthStatus; @@ -41,7 +39,6 @@ import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositi import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.models.base.PfModelException; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; -import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate; import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,12 +51,10 @@ import org.springframework.stereotype.Component; public class SupervisionScanner { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionScanner.class); - private final HandleCounter<ToscaConceptIdentifier> automationCompositionCounter = new HandleCounter<>(); + private final HandleCounter<UUID> automationCompositionCounter = new HandleCounter<>(); private final HandleCounter<ToscaConceptIdentifier> participantStatusCounter = new HandleCounter<>(); - private final HandleCounter<Pair<ToscaConceptIdentifier, ToscaConceptIdentifier>> participantUpdateCounter = - new HandleCounter<>(); - private final Map<ToscaConceptIdentifier, Integer> phaseMap = new HashMap<>(); + private final Map<UUID, Integer> phaseMap = new HashMap<>(); private final AutomationCompositionProvider automationCompositionProvider; private final AcDefinitionProvider acDefinitionProvider; @@ -67,7 +62,6 @@ public class SupervisionScanner { private final AutomationCompositionUpdatePublisher automationCompositionUpdatePublisher; private final ParticipantProvider participantProvider; private final ParticipantStatusReqPublisher participantStatusReqPublisher; - private final ParticipantUpdatePublisher participantUpdatePublisher; /** * Constructor for instantiating SupervisionScanner. @@ -78,7 +72,6 @@ public class SupervisionScanner { * @param automationCompositionUpdatePublisher the AutomationCompositionUpdate Publisher * @param participantProvider the Participant Provider * @param participantStatusReqPublisher the Participant StatusReq Publisher - * @param participantUpdatePublisher the Participant Update Publisher * @param acRuntimeParameterGroup the parameters for the automation composition runtime */ public SupervisionScanner(final AutomationCompositionProvider automationCompositionProvider, @@ -86,7 +79,6 @@ public class SupervisionScanner { final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher, AutomationCompositionUpdatePublisher automationCompositionUpdatePublisher, ParticipantProvider participantProvider, ParticipantStatusReqPublisher participantStatusReqPublisher, - ParticipantUpdatePublisher participantUpdatePublisher, final AcRuntimeParameterGroup acRuntimeParameterGroup) { this.automationCompositionProvider = automationCompositionProvider; this.acDefinitionProvider = acDefinitionProvider; @@ -94,18 +86,12 @@ public class SupervisionScanner { this.automationCompositionUpdatePublisher = automationCompositionUpdatePublisher; this.participantProvider = participantProvider; this.participantStatusReqPublisher = participantStatusReqPublisher; - this.participantUpdatePublisher = participantUpdatePublisher; automationCompositionCounter.setMaxRetryCount( acRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount()); automationCompositionCounter .setMaxWaitMs(acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs()); - participantUpdateCounter.setMaxRetryCount( - acRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount()); - participantUpdateCounter - .setMaxWaitMs(acRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxWaitMs()); - participantStatusCounter.setMaxRetryCount( acRuntimeParameterGroup.getParticipantParameters().getUpdateParameters().getMaxRetryCount()); participantStatusCounter.setMaxWaitMs(acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs()); @@ -130,48 +116,17 @@ public class SupervisionScanner { } } - try { - var list = acDefinitionProvider.getAllAcDefinitions(); - for (var acDefinition : list) { - var acList = - automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId()); - for (var automationComposition : acList) { - scanAutomationComposition(automationComposition, acDefinition.getServiceTemplate(), counterCheck); - } + var list = acDefinitionProvider.getAllAcDefinitions(); + for (var acDefinition : list) { + var acList = automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId()); + for (var automationComposition : acList) { + scanAutomationComposition(automationComposition, acDefinition.getServiceTemplate(), counterCheck); } - } catch (PfModelException pfme) { - LOGGER.warn("error reading automation compositions from database", pfme); - } - - if (counterCheck) { - scanParticipantUpdate(); } LOGGER.debug("Automation composition scan complete . . ."); } - private void scanParticipantUpdate() { - LOGGER.debug("Scanning participants to update . . ."); - - for (var id : participantUpdateCounter.keySet()) { - if (participantUpdateCounter.isFault(id)) { - LOGGER.debug("report Participant Update fault"); - - } else if (participantUpdateCounter.getDuration(id) > participantUpdateCounter.getMaxWaitMs()) { - - if (participantUpdateCounter.count(id)) { - LOGGER.debug("retry message ParticipantUpdate"); - participantUpdatePublisher.sendCommissioning(null, null, id.getLeft(), id.getRight()); - } else { - LOGGER.debug("report Participant Update fault"); - participantUpdateCounter.setFault(id); - } - } - } - - LOGGER.debug("Participants to update scan complete . . ."); - } - private void scanParticipantStatus(Participant participant) throws PfModelException { ToscaConceptIdentifier id = participant.getKey().asIdentifier(); if (participantStatusCounter.isFault(id)) { @@ -199,20 +154,12 @@ public class SupervisionScanner { participantStatusCounter.clear(id); } - public void handleParticipantRegister(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) { - participantUpdateCounter.clear(id); - } - - public void handleParticipantUpdateAck(Pair<ToscaConceptIdentifier, ToscaConceptIdentifier> id) { - participantUpdateCounter.remove(id); - } - private void scanAutomationComposition(final AutomationComposition automationComposition, - ToscaServiceTemplate toscaServiceTemplate, boolean counterCheck) throws PfModelException { - LOGGER.debug("scanning automation composition {} . . .", automationComposition.getKey().asIdentifier()); + ToscaServiceTemplate toscaServiceTemplate, boolean counterCheck) { + LOGGER.debug("scanning automation composition {} . . .", automationComposition.getInstanceId()); if (automationComposition.getState().equals(automationComposition.getOrderedState().asState())) { - LOGGER.debug("automation composition {} scanned, OK", automationComposition.getKey().asIdentifier()); + LOGGER.debug("automation composition {} scanned, OK", automationComposition.getInstanceId()); // Clear missed report counter on automation composition clearFaultAndCounter(automationComposition); @@ -224,8 +171,8 @@ public class SupervisionScanner { var maxSpNotCompleted = 0; // max startPhase not completed var defaultMin = 1000; // min startPhase var defaultMax = 0; // max startPhase - for (AutomationCompositionElement element : automationComposition.getElements().values()) { - ToscaNodeTemplate toscaNodeTemplate = toscaServiceTemplate.getToscaTopologyTemplate().getNodeTemplates() + for (var element : automationComposition.getElements().values()) { + var toscaNodeTemplate = toscaServiceTemplate.getToscaTopologyTemplate().getNodeTemplates() .get(element.getDefinition().getName()); int startPhase = ParticipantUtils.findStartPhase(toscaNodeTemplate.getProperties()); defaultMin = Math.min(defaultMin, startPhase); @@ -262,36 +209,36 @@ public class SupervisionScanner { ? defaultMin : defaultMax; - if (nextSpNotCompleted != phaseMap.getOrDefault(automationComposition.getKey().asIdentifier(), + if (nextSpNotCompleted != phaseMap.getOrDefault(automationComposition.getInstanceId(), firstStartPhase)) { - phaseMap.put(automationComposition.getKey().asIdentifier(), nextSpNotCompleted); + phaseMap.put(automationComposition.getInstanceId(), nextSpNotCompleted); sendAutomationCompositionMsg(automationComposition, nextSpNotCompleted); } else if (counterCheck) { - phaseMap.put(automationComposition.getKey().asIdentifier(), nextSpNotCompleted); + phaseMap.put(automationComposition.getInstanceId(), nextSpNotCompleted); handleCounter(automationComposition, nextSpNotCompleted); } } } private void clearFaultAndCounter(AutomationComposition automationComposition) { - automationCompositionCounter.clear(automationComposition.getKey().asIdentifier()); - phaseMap.clear(); + automationCompositionCounter.clear(automationComposition.getInstanceId()); + phaseMap.remove(automationComposition.getInstanceId()); } private void handleCounter(AutomationComposition automationComposition, int startPhase) { - ToscaConceptIdentifier id = automationComposition.getKey().asIdentifier(); - if (automationCompositionCounter.isFault(id)) { + var instanceId = automationComposition.getInstanceId(); + if (automationCompositionCounter.isFault(instanceId)) { LOGGER.debug("report AutomationComposition fault"); return; } - if (automationCompositionCounter.getDuration(id) > automationCompositionCounter.getMaxWaitMs()) { - if (automationCompositionCounter.count(id)) { - phaseMap.put(id, startPhase); + if (automationCompositionCounter.getDuration(instanceId) > automationCompositionCounter.getMaxWaitMs()) { + if (automationCompositionCounter.count(instanceId)) { + phaseMap.put(instanceId, startPhase); sendAutomationCompositionMsg(automationComposition, startPhase); } else { LOGGER.debug("report AutomationComposition fault"); - automationCompositionCounter.setFault(id); + automationCompositionCounter.setFault(instanceId); } } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangePublisher.java index 4c3cc958b..8d6378e3f 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangePublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangePublisher.java @@ -43,6 +43,7 @@ public class AutomationCompositionStateChangePublisher description = "AUTOMATION_COMPOSITION_STATE_CHANGE messages published") public void send(AutomationComposition automationComposition, int startPhase) { var acsc = new AutomationCompositionStateChange(); + acsc.setCompositionId(automationComposition.getCompositionId()); acsc.setAutomationCompositionId(automationComposition.getInstanceId()); acsc.setMessageId(UUID.randomUUID()); acsc.setOrderedState(automationComposition.getOrderedState()); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdatePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdatePublisher.java index c10e47bb4..7b114c920 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdatePublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdatePublisher.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.UUID; import lombok.AllArgsConstructor; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; -import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement; import org.onap.policy.clamp.models.acm.concepts.ParticipantUpdates; import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionUpdate; import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; @@ -69,6 +68,7 @@ public class AutomationCompositionUpdatePublisher extends AbstractParticipantPub description = "AUTOMATION_COMPOSITION_UPDATE messages published") public void send(AutomationComposition automationComposition, int startPhase) { var automationCompositionUpdateMsg = new AutomationCompositionUpdate(); + automationCompositionUpdateMsg.setCompositionId(automationComposition.getCompositionId()); automationCompositionUpdateMsg.setStartPhase(startPhase); automationCompositionUpdateMsg.setAutomationCompositionId(automationComposition.getInstanceId()); automationCompositionUpdateMsg.setMessageId(UUID.randomUUID()); @@ -76,7 +76,7 @@ public class AutomationCompositionUpdatePublisher extends AbstractParticipantPub var toscaServiceTemplate = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()); List<ParticipantUpdates> participantUpdates = new ArrayList<>(); - for (AutomationCompositionElement element : automationComposition.getElements().values()) { + for (var element : automationComposition.getElements().values()) { AcmUtils.setAcPolicyInfo(element, toscaServiceTemplate); AcmUtils.prepareParticipantUpdate(element, participantUpdates); } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java index fa5e423d7..af9023815 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantUpdatePublisher.java @@ -26,7 +26,9 @@ import io.micrometer.core.annotation.Timed; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import lombok.AllArgsConstructor; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils; import org.onap.policy.clamp.models.acm.messages.dmaap.participant.ParticipantUpdate; @@ -51,47 +53,56 @@ public class ParticipantUpdatePublisher extends AbstractParticipantPublisher<Par /** * Send ParticipantUpdate to all Participants. * - * @param name the ToscaServiceTemplate name - * @param version the ToscaServiceTemplate version + * @param acmDefinition the AutomationComposition Definition */ @Timed(value = "publisher.participant_update", description = "PARTICIPANT_UPDATE messages published") - public void sendComissioningBroadcast(String name, String version) { - sendCommissioning(name, version, null, null); + public void sendComissioningBroadcast(AutomationCompositionDefinition acmDefinition) { + sendCommissioning(acmDefinition, null, null); } /** * Send ParticipantUpdate to Participant * if participantType and participantId are null then message is broadcast. * - * @param name the ToscaServiceTemplate name - * @param version the ToscaServiceTemplate version * @param participantType the ParticipantType * @param participantId the ParticipantId */ @Timed(value = "publisher.participant_update", description = "PARTICIPANT_UPDATE messages published") - public boolean sendCommissioning(String name, String version, ToscaConceptIdentifier participantType, - ToscaConceptIdentifier participantId) { + public void sendCommissioning(ToscaConceptIdentifier participantType, ToscaConceptIdentifier participantId) { + var list = acDefinitionProvider.getAllAcDefinitions(); + if (list.isEmpty()) { + LOGGER.warn("No tosca service template found, cannot send participantupdate"); + } + for (var acmDefinition : list) { + sendCommissioning(acmDefinition, participantType, participantId); + } + } + + /** + * Send ParticipantUpdate to Participant + * if participantType and participantId are null then message is broadcast. + * + * @param acmDefinition the AutomationComposition Definition + * @param participantType the ParticipantType + * @param participantId the ParticipantId + */ + @Timed(value = "publisher.participant_update", description = "PARTICIPANT_UPDATE messages published") + public void sendCommissioning(AutomationCompositionDefinition acmDefinition, + ToscaConceptIdentifier participantType, ToscaConceptIdentifier participantId) { var message = new ParticipantUpdate(); + message.setCompositionId(acmDefinition.getCompositionId()); message.setParticipantType(participantType); message.setParticipantId(participantId); message.setTimestamp(Instant.now()); - var list = acDefinitionProvider.getServiceTemplateList(name, version); - if (list.isEmpty()) { - LOGGER.warn("No tosca service template found, cannot send participantupdate {} {}", name, version); - return false; - } - var toscaServiceTemplate = list.get(0); - var commonPropertiesMap = AcmUtils.getCommonOrInstancePropertiesFromNodeTypes(true, toscaServiceTemplate); - + var toscaServiceTemplate = acmDefinition.getServiceTemplate(); List<ParticipantDefinition> participantDefinitionUpdates = new ArrayList<>(); for (var toscaInputEntry : toscaServiceTemplate.getToscaTopologyTemplate().getNodeTemplates().entrySet()) { if (ParticipantUtils.checkIfNodeTemplateIsAutomationCompositionElement(toscaInputEntry.getValue(), toscaServiceTemplate)) { AcmUtils.prepareParticipantDefinitionUpdate( ParticipantUtils.findParticipantType(toscaInputEntry.getValue().getProperties()), - toscaInputEntry.getKey(), toscaInputEntry.getValue(), participantDefinitionUpdates, - commonPropertiesMap); + toscaInputEntry.getKey(), toscaInputEntry.getValue(), participantDefinitionUpdates); } } @@ -99,15 +110,15 @@ public class ParticipantUpdatePublisher extends AbstractParticipantPublisher<Par message.setParticipantDefinitionUpdates(participantDefinitionUpdates); LOGGER.debug("Participant Update sent {}", message); super.send(message); - return true; } /** * Send ParticipantUpdate to Participant after that commissioning has been removed. */ @Timed(value = "publisher.participant_update", description = "PARTICIPANT_UPDATE messages published") - public void sendDecomisioning() { + public void sendDecomisioning(UUID compositionId) { var message = new ParticipantUpdate(); + message.setCompositionId(compositionId); message.setTimestamp(Instant.now()); // DeCommission the automation composition but deleting participantdefinitions on participants message.setParticipantDefinitionUpdates(null); |