diff options
Diffstat (limited to 'runtime-acm/src/main')
35 files changed, 734 insertions, 544 deletions
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java index 74ccb9cc6..48b139495 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java @@ -29,7 +29,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import lombok.RequiredArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; @@ -39,6 +38,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.commissioning.Commissionin import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AcTypeStateResolver; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; +import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.models.acm.utils.TimestampHelper; import org.onap.policy.models.base.PfModelRuntimeException; import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; @@ -56,7 +56,7 @@ public class CommissioningProvider { private final AcDefinitionProvider acDefinitionProvider; private final AutomationCompositionProvider acProvider; - private final AcmParticipantProvider acmParticipantProvider; + private final ParticipantProvider participantProvider; private final AcTypeStateResolver acTypeStateResolver; private final ParticipantPrimePublisher participantPrimePublisher; private final AcRuntimeParameterGroup acRuntimeParameterGroup; @@ -188,10 +188,6 @@ public class CommissioningProvider { throw new PfModelRuntimeException(Status.BAD_REQUEST, "There are instances, Priming/Depriming not allowed"); } var acmDefinition = acDefinitionProvider.getAcDefinition(compositionId); - if (acmDefinition.getRestarting() != null) { - throw new PfModelRuntimeException(Status.BAD_REQUEST, - "There is a restarting process, Priming/Depriming not allowed"); - } var stateOrdered = acTypeStateResolver.resolve(acTypeStateUpdate.getPrimeOrder(), acmDefinition.getState(), acmDefinition.getStateChangeResult()); switch (stateOrdered) { @@ -229,7 +225,7 @@ public class CommissioningProvider { } } if (!participantIds.isEmpty()) { - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); } acmDefinition.setState(AcTypeState.DEPRIMING); acmDefinition.setLastMsg(TimestampHelper.now()); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/MetricsConfiguration.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/MetricsConfiguration.java index 084f7c774..f5c8368e2 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/MetricsConfiguration.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/MetricsConfiguration.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation. + * Copyright (C) 2022, 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ package org.onap.policy.clamp.acm.runtime.config; import io.micrometer.core.aop.TimedAspect; import io.micrometer.core.instrument.MeterRegistry; import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -34,8 +35,9 @@ public class MetricsConfiguration { * Load up the metrics registry. */ @Bean - public InitializingBean forcePrometheusPostProcessor(BeanPostProcessor meterRegistryPostProcessor, - MeterRegistry registry) { + public InitializingBean forcePrometheusPostProcessor(@Qualifier("meterRegistryPostProcessor") + BeanPostProcessor meterRegistryPostProcessor, + MeterRegistry registry) { return () -> meterRegistryPostProcessor.postProcessAfterInitialization(registry, ""); } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/OpenTelConfiguration.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/OpenTelConfiguration.java index 3727333a4..05d47d4f7 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/OpenTelConfiguration.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/OpenTelConfiguration.java @@ -35,7 +35,7 @@ import org.springframework.context.annotation.Configuration; public class OpenTelConfiguration { @Bean - @ConditionalOnProperty(prefix = "tracing", name = "enabled", havingValue = "true", matchIfMissing = false) + @ConditionalOnProperty(prefix = "tracing", name = "enabled", havingValue = "true") @ConditionalOnExpression("'http'.equals('${tracing.exporter.protocol}')") OtlpHttpSpanExporter otlpHttpSpanExporter(@Value("${tracing.exporter.endpoint:http://jaeger:4318/v1/traces}") String url) { return OtlpHttpSpanExporter.builder() @@ -44,7 +44,7 @@ public class OpenTelConfiguration { } @Bean - @ConditionalOnProperty(prefix = "tracing", name = "enabled", havingValue = "true", matchIfMissing = false) + @ConditionalOnProperty(prefix = "tracing", name = "enabled", havingValue = "true") @ConditionalOnExpression("'grpc'.equals('${tracing.exporter.protocol}')") OtlpGrpcSpanExporter otlpGrpcSpanExporter(@Value("${tracing.exporter.endpoint:http://jaeger:4317}") String url) { return OtlpGrpcSpanExporter.builder() @@ -53,7 +53,7 @@ public class OpenTelConfiguration { } @Bean - @ConditionalOnProperty(prefix = "tracing", name = "enabled", havingValue = "true", matchIfMissing = false) + @ConditionalOnProperty(prefix = "tracing", name = "enabled", havingValue = "true") JaegerRemoteSampler jaegerRemoteSampler( @Value("${tracing.sampler.jaeger-remote.endpoint:http://jaeger:14250}") String url, @Value("${SERVICE_ID:unknown_service}") String serviceId) { diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/SecurityConfig.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/SecurityConfig.java index 2e75db12e..9d50fc739 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/SecurityConfig.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/SecurityConfig.java @@ -45,16 +45,16 @@ public class SecurityConfig { */ @Bean public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception { - http - .httpBasic(Customizer.withDefaults()) - .authorizeHttpRequests(authorize -> { - if (useBasicAuth) { - authorize.anyRequest().authenticated(); - } else { - authorize.anyRequest().permitAll(); - } - }) - .csrf(AbstractHttpConfigurer::disable); + if (useBasicAuth) { + http + .httpBasic(Customizer.withDefaults()) + .authorizeHttpRequests(authorize -> authorize.anyRequest().authenticated()); + } else { + http + .authorizeHttpRequests(authorize -> authorize.anyRequest().permitAll()); + } + + http.csrf(AbstractHttpConfigurer::disable); return http.build(); } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java index a3e55c3f7..6d8e50285 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java @@ -22,18 +22,17 @@ package org.onap.policy.clamp.acm.runtime.config.messaging; import java.io.Closeable; -import java.io.IOException; import java.util.List; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import lombok.Getter; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException; -import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; +import org.onap.policy.common.message.bus.event.Topic; +import org.onap.policy.common.message.bus.event.TopicEndpointManager; +import org.onap.policy.common.message.bus.event.TopicSink; +import org.onap.policy.common.message.bus.event.TopicSource; import org.onap.policy.common.utils.services.ServiceManagerContainer; import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.ContextRefreshedEvent; @@ -139,7 +138,7 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen } @Override - public void close() throws IOException { + public void close() { if (isAlive()) { super.shutdown(); } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java index a76a09d99..09408adea 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java @@ -20,9 +20,7 @@ package org.onap.policy.clamp.acm.runtime.config.messaging; -import java.util.List; -import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; -import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.message.bus.event.TopicSink; /** * Publisher. diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java index 220636b9d..42af70596 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java @@ -22,14 +22,13 @@ package org.onap.policy.clamp.acm.runtime.instantiation; import jakarta.validation.Valid; -import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.Response.Status; +import java.util.List; import java.util.UUID; import java.util.stream.Collectors; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; @@ -38,15 +37,21 @@ import org.onap.policy.clamp.models.acm.concepts.DeployState; import org.onap.policy.clamp.models.acm.concepts.LockState; import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; +import org.onap.policy.clamp.models.acm.concepts.SubState; import org.onap.policy.clamp.models.acm.messages.rest.instantiation.AcInstanceStateUpdate; +import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder; import org.onap.policy.clamp.models.acm.messages.rest.instantiation.InstantiationResponse; +import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder; +import org.onap.policy.clamp.models.acm.messages.rest.instantiation.SubOrder; import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; +import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.models.acm.utils.AcmUtils; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.ObjectValidationResult; import org.onap.policy.common.parameters.ValidationStatus; +import org.onap.policy.models.base.PfConceptKey; import org.onap.policy.models.base.PfKey; import org.onap.policy.models.base.PfModelRuntimeException; import org.slf4j.Logger; @@ -62,6 +67,7 @@ import org.springframework.transaction.annotation.Transactional; @RequiredArgsConstructor public class AutomationCompositionInstantiationProvider { private static final String DO_NOT_MATCH = " do not match with "; + private static final String ELEMENT_ID_NOT_PRESENT = "Element id not present "; private static final Logger LOGGER = LoggerFactory.getLogger(AutomationCompositionInstantiationProvider.class); @@ -69,7 +75,7 @@ public class AutomationCompositionInstantiationProvider { private final AcDefinitionProvider acDefinitionProvider; private final AcInstanceStateResolver acInstanceStateResolver; private final SupervisionAcHandler supervisionAcHandler; - private final AcmParticipantProvider acmParticipantProvider; + private final ParticipantProvider participantProvider; private final AcRuntimeParameterGroup acRuntimeParameterGroup; /** @@ -82,19 +88,19 @@ public class AutomationCompositionInstantiationProvider { public InstantiationResponse createAutomationComposition(UUID compositionId, AutomationComposition automationComposition) { if (!compositionId.equals(automationComposition.getCompositionId())) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, + throw new PfModelRuntimeException(Status.BAD_REQUEST, automationComposition.getCompositionId() + DO_NOT_MATCH + compositionId); } var checkAutomationCompositionOpt = automationCompositionProvider.findAutomationComposition(automationComposition.getKey().asIdentifier()); if (checkAutomationCompositionOpt.isPresent()) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, + throw new PfModelRuntimeException(Status.BAD_REQUEST, automationComposition.getKey().asIdentifier() + " already defined"); } var validationResult = validateAutomationComposition(automationComposition); if (!validationResult.isValid()) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, validationResult.getResult()); + throw new PfModelRuntimeException(Status.BAD_REQUEST, validationResult.getResult()); } automationComposition = automationCompositionProvider.createAutomationComposition(automationComposition); @@ -120,7 +126,7 @@ public class AutomationCompositionInstantiationProvider { var instanceId = automationComposition.getInstanceId(); var acToUpdate = automationCompositionProvider.getAutomationComposition(instanceId); if (!compositionId.equals(acToUpdate.getCompositionId())) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, + throw new PfModelRuntimeException(Status.BAD_REQUEST, automationComposition.getCompositionId() + DO_NOT_MATCH + compositionId); } if (DeployState.UNDEPLOYED.equals(acToUpdate.getDeployState())) { @@ -131,22 +137,38 @@ public class AutomationCompositionInstantiationProvider { acToUpdate.setDerivedFrom(automationComposition.getDerivedFrom()); var validationResult = validateAutomationComposition(acToUpdate); if (!validationResult.isValid()) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, validationResult.getResult()); + throw new PfModelRuntimeException(Status.BAD_REQUEST, validationResult.getResult()); } automationComposition = automationCompositionProvider.updateAutomationComposition(acToUpdate); return createInstantiationResponse(automationComposition); - } else if ((DeployState.DEPLOYED.equals(acToUpdate.getDeployState()) - || DeployState.UPDATING.equals(acToUpdate.getDeployState())) - && LockState.LOCKED.equals(acToUpdate.getLockState())) { - if (automationComposition.getCompositionTargetId() != null) { - return migrateAutomationComposition(automationComposition, acToUpdate); + } + + var deployOrder = DeployOrder.UPDATE; + var subOrder = SubOrder.NONE; + + if (automationComposition.getCompositionTargetId() != null) { + + if (Boolean.TRUE.equals(automationComposition.getPrecheck())) { + subOrder = SubOrder.MIGRATE_PRECHECK; + deployOrder = DeployOrder.NONE; } else { - return updateDeployedAutomationComposition(automationComposition, acToUpdate); + deployOrder = DeployOrder.MIGRATE; } } - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, - "Not allowed to update in the state " + acToUpdate.getDeployState()); + var result = acInstanceStateResolver.resolve(deployOrder, LockOrder.NONE, subOrder, + acToUpdate.getDeployState(), acToUpdate.getLockState(), acToUpdate.getSubState(), + acToUpdate.getStateChangeResult()); + return switch (result) { + case "UPDATE" -> updateDeployedAutomationComposition(automationComposition, acToUpdate); + + case "MIGRATE" -> migrateAutomationComposition(automationComposition, acToUpdate); + + case "MIGRATE_PRECHECK" -> migratePrecheckAc(automationComposition, acToUpdate); + + default -> throw new PfModelRuntimeException(Status.BAD_REQUEST, + "Not allowed to " + deployOrder + " in the state " + acToUpdate.getDeployState()); + }; } /** @@ -164,17 +186,14 @@ public class AutomationCompositionInstantiationProvider { var elementId = element.getKey(); var dbAcElement = acToBeUpdated.getElements().get(elementId); if (dbAcElement == null) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, "Element id not present " + elementId); + throw new PfModelRuntimeException(Status.BAD_REQUEST, ELEMENT_ID_NOT_PRESENT + elementId); } AcmUtils.recursiveMerge(dbAcElement.getProperties(), element.getValue().getProperties()); } - if (automationComposition.getRestarting() != null) { - throw new PfModelRuntimeException(Status.BAD_REQUEST, "There is a restarting process, Update not allowed"); - } var validationResult = validateAutomationComposition(acToBeUpdated); if (!validationResult.isValid()) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, validationResult.getResult()); + throw new PfModelRuntimeException(Status.BAD_REQUEST, validationResult.getResult()); } // Publish property update event to the participants supervisionAcHandler.update(acToBeUpdated); @@ -187,47 +206,104 @@ public class AutomationCompositionInstantiationProvider { AutomationComposition automationComposition, AutomationComposition acToBeUpdated) { if (!DeployState.DEPLOYED.equals(acToBeUpdated.getDeployState())) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, + throw new PfModelRuntimeException(Status.BAD_REQUEST, "Not allowed to migrate in the state " + acToBeUpdated.getDeployState()); } - if (automationComposition.getRestarting() != null) { - throw new PfModelRuntimeException(Status.BAD_REQUEST, "There is a restarting process, Migrate not allowed"); - } // Iterate and update the element property values for (var element : automationComposition.getElements().entrySet()) { var elementId = element.getKey(); var dbAcElement = acToBeUpdated.getElements().get(elementId); + // Add additional elements if present for migration if (dbAcElement == null) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, "Element id not present " + elementId); - } - AcmUtils.recursiveMerge(dbAcElement.getProperties(), element.getValue().getProperties()); - var newDefinition = element.getValue().getDefinition(); - var compatibility = - newDefinition.asConceptKey().getCompatibility(dbAcElement.getDefinition().asConceptKey()); - if (PfKey.Compatibility.DIFFERENT.equals(compatibility)) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, - dbAcElement.getDefinition() + " is not compatible with " + newDefinition); - } - if (PfKey.Compatibility.MAJOR.equals(compatibility) || PfKey.Compatibility.MINOR.equals(compatibility)) { - LOGGER.warn("Migrate {}: Version {} has {} compatibility with {} ", - automationComposition.getInstanceId(), newDefinition, compatibility, dbAcElement.getDefinition()); + LOGGER.info("New Ac element {} added in Migration", elementId); + acToBeUpdated.getElements().put(elementId, element.getValue()); + } else { + AcmUtils.recursiveMerge(dbAcElement.getProperties(), element.getValue().getProperties()); + var newDefinition = element.getValue().getDefinition().asConceptKey(); + var dbElementDefinition = dbAcElement.getDefinition().asConceptKey(); + checkCompatibility(newDefinition, dbElementDefinition, automationComposition.getInstanceId()); + dbAcElement.setDefinition(element.getValue().getDefinition()); } - dbAcElement.setDefinition(element.getValue().getDefinition()); } + // Remove element which is not present in the new Ac instance + var elementsRemoved = getElementRemoved(acToBeUpdated, automationComposition); + elementsRemoved.forEach(uuid -> acToBeUpdated.getElements().remove(uuid)); var validationResult = - validateAutomationComposition(acToBeUpdated, automationComposition.getCompositionTargetId()); + validateAutomationComposition(acToBeUpdated, automationComposition.getCompositionTargetId()); if (!validationResult.isValid()) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, validationResult.getResult()); + throw new PfModelRuntimeException(Status.BAD_REQUEST, validationResult.getResult()); } acToBeUpdated.setCompositionTargetId(automationComposition.getCompositionTargetId()); + var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionTargetId()); + // Publish migrate event to the participants + supervisionAcHandler.migrate(acToBeUpdated, acDefinition.getServiceTemplate()); + + var ac = automationCompositionProvider.updateAutomationComposition(acToBeUpdated); + elementsRemoved.forEach(automationCompositionProvider::deleteAutomationCompositionElement); + return createInstantiationResponse(ac); + } + + private List<UUID> getElementRemoved(AutomationComposition acFromDb, AutomationComposition acFromMigration) { + return acFromDb.getElements().keySet().stream() + .filter(id -> acFromMigration.getElements().get(id) == null).toList(); + } + + void checkCompatibility(PfConceptKey newDefinition, PfConceptKey dbElementDefinition, + UUID instanceId) { + var compatibility = newDefinition.getCompatibility(dbElementDefinition); + if (PfKey.Compatibility.DIFFERENT.equals(compatibility)) { + throw new PfModelRuntimeException(Status.BAD_REQUEST, + dbElementDefinition + " is not compatible with " + newDefinition); + } + if (PfKey.Compatibility.MAJOR.equals(compatibility) || PfKey.Compatibility.MINOR + .equals(compatibility)) { + LOGGER.warn("Migrate {}: Version {} has {} compatibility with {} ", instanceId, newDefinition, + compatibility, dbElementDefinition); + } + } + + private InstantiationResponse migratePrecheckAc( + AutomationComposition automationComposition, AutomationComposition acToBeUpdated) { + + acToBeUpdated.setPrecheck(true); + var copyAc = new AutomationComposition(acToBeUpdated); + // Iterate and update the element property values + for (var element : automationComposition.getElements().entrySet()) { + var elementId = element.getKey(); + var copyElement = copyAc.getElements().get(elementId); + // Add additional elements if present for migration + if (copyElement == null) { + LOGGER.info("New Ac element {} added in Migration", elementId); + copyAc.getElements().put(elementId, element.getValue()); + } else { + AcmUtils.recursiveMerge(copyElement.getProperties(), element.getValue().getProperties()); + var newDefinition = element.getValue().getDefinition().asConceptKey(); + var copyElementDefinition = copyElement.getDefinition().asConceptKey(); + checkCompatibility(newDefinition, copyElementDefinition, automationComposition.getInstanceId()); + copyElement.setDefinition(element.getValue().getDefinition()); + } + } + // Remove element which is not present in the new Ac instance + var elementsRemoved = getElementRemoved(copyAc, automationComposition); + elementsRemoved.forEach(uuid -> copyAc.getElements().remove(uuid)); + + var validationResult = + validateAutomationComposition(copyAc, automationComposition.getCompositionTargetId()); + if (!validationResult.isValid()) { + throw new PfModelRuntimeException(Status.BAD_REQUEST, validationResult.getResult()); + } + copyAc.setCompositionTargetId(automationComposition.getCompositionTargetId()); // Publish migrate event to the participants - supervisionAcHandler.migrate(acToBeUpdated, automationComposition.getCompositionTargetId()); + supervisionAcHandler.migratePrecheck(copyAc); - automationComposition = automationCompositionProvider.updateAutomationComposition(acToBeUpdated); - return createInstantiationResponse(automationComposition); + AcmUtils.setCascadedState(acToBeUpdated, DeployState.DEPLOYED, LockState.LOCKED, + SubState.MIGRATION_PRECHECKING); + acToBeUpdated.setStateChangeResult(StateChangeResult.NO_ERROR); + + return createInstantiationResponse(automationCompositionProvider.updateAutomationComposition(acToBeUpdated)); } private BeanValidationResult validateAutomationComposition(AutomationComposition automationComposition) { @@ -256,16 +332,10 @@ public class AutomationCompositionInstantiationProvider { ValidationStatus.INVALID, "Commissioned automation composition definition not primed")); return result; } - if (acDefinitionOpt.get().getRestarting() != null) { - result.addResult( - new ObjectValidationResult("ServiceTemplate.restarting", acDefinitionOpt.get().getRestarting(), - ValidationStatus.INVALID, "There is a restarting process in composition")); - return result; - } var participantIds = acDefinitionOpt.get().getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); result.addResult(AcmUtils.validateAutomationComposition(automationComposition, acDefinitionOpt.get().getServiceTemplate(), @@ -296,7 +366,7 @@ public class AutomationCompositionInstantiationProvider { var automationComposition = automationCompositionProvider.getAutomationComposition(instanceId); if (!compositionId.equals(automationComposition.getCompositionId()) && !compositionId.equals(automationComposition.getCompositionTargetId())) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, + throw new PfModelRuntimeException(Status.BAD_REQUEST, automationComposition.getCompositionId() + DO_NOT_MATCH + compositionId); } return automationComposition; @@ -312,26 +382,23 @@ public class AutomationCompositionInstantiationProvider { public InstantiationResponse deleteAutomationComposition(UUID compositionId, UUID instanceId) { var automationComposition = automationCompositionProvider.getAutomationComposition(instanceId); if (!compositionId.equals(automationComposition.getCompositionId())) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, + throw new PfModelRuntimeException(Status.BAD_REQUEST, automationComposition.getCompositionId() + DO_NOT_MATCH + compositionId); } if (!DeployState.UNDEPLOYED.equals(automationComposition.getDeployState()) && !DeployState.DELETING.equals(automationComposition.getDeployState())) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, + throw new PfModelRuntimeException(Status.BAD_REQUEST, "Automation composition state is still " + automationComposition.getDeployState()); } if (DeployState.DELETING.equals(automationComposition.getDeployState()) && StateChangeResult.NO_ERROR.equals(automationComposition.getStateChangeResult())) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, + throw new PfModelRuntimeException(Status.BAD_REQUEST, "Automation composition state is still " + automationComposition.getDeployState()); } - if (automationComposition.getRestarting() != null) { - throw new PfModelRuntimeException(Status.BAD_REQUEST, "There is a restarting process, Delete not allowed"); - } var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()); var participantIds = acDefinition.getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); supervisionAcHandler.delete(automationComposition, acDefinition); var response = new InstantiationResponse(); response.setInstanceId(automationComposition.getInstanceId()); @@ -366,7 +433,7 @@ public class AutomationCompositionInstantiationProvider { @Valid AcInstanceStateUpdate acInstanceStateUpdate) { var automationComposition = automationCompositionProvider.getAutomationComposition(instanceId); if (!compositionId.equals(automationComposition.getCompositionId())) { - throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, + throw new PfModelRuntimeException(Status.BAD_REQUEST, automationComposition.getCompositionId() + DO_NOT_MATCH + compositionId); } var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()); @@ -374,10 +441,11 @@ public class AutomationCompositionInstantiationProvider { var participantIds = acDefinition.getElementStateMap().values().stream() .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet()); - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); var result = acInstanceStateResolver.resolve(acInstanceStateUpdate.getDeployOrder(), - acInstanceStateUpdate.getLockOrder(), automationComposition.getDeployState(), - automationComposition.getLockState(), automationComposition.getStateChangeResult()); + acInstanceStateUpdate.getLockOrder(), acInstanceStateUpdate.getSubOrder(), + automationComposition.getDeployState(), automationComposition.getLockState(), + automationComposition.getSubState(), automationComposition.getStateChangeResult()); switch (result) { case "DEPLOY": supervisionAcHandler.deploy(automationComposition, acDefinition); @@ -395,6 +463,14 @@ public class AutomationCompositionInstantiationProvider { supervisionAcHandler.unlock(automationComposition, acDefinition); break; + case "PREPARE": + supervisionAcHandler.prepare(automationComposition); + break; + + case "REVIEW": + supervisionAcHandler.review(automationComposition); + break; + default: throw new PfModelRuntimeException(Status.BAD_REQUEST, "Not valid " + acInstanceStateUpdate); } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java index a0b6fe13e..67befe394 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java @@ -24,7 +24,7 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; import lombok.Getter; import lombok.Setter; -import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; +import org.onap.policy.common.parameters.topic.TopicParameterGroup; import org.onap.policy.common.parameters.validation.ParameterGroupConstraint; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; @@ -53,5 +53,5 @@ public class AcRuntimeParameterGroup { @Valid @NotNull - private Topics topics; + private Topics topics = new Topics(); } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java index d485a24ba..6e230d3df 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java @@ -20,6 +20,7 @@ package org.onap.policy.clamp.acm.runtime.main.parameters; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.Setter; import org.springframework.validation.annotation.Validated; @@ -27,6 +28,7 @@ import org.springframework.validation.annotation.Validated; @Setter @Validated @AllArgsConstructor +@NoArgsConstructor public class Topics { private String operationTopic; diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/ParticipantController.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/ParticipantController.java index 855681e69..1136dcb16 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/ParticipantController.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/ParticipantController.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation. + * Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,8 +41,7 @@ public class ParticipantController extends AbstractRestController implements Par @Override public ResponseEntity<ParticipantInformation> getParticipant(UUID participantId, UUID requestId) { - ParticipantInformation participantInformation = acmParticipantProvider - .getParticipantById(participantId); + var participantInformation = acmParticipantProvider.getParticipantById(participantId); return ResponseEntity.ok().body(participantInformation); } @@ -61,7 +60,7 @@ public class ParticipantController extends AbstractRestController implements Par @Override public ResponseEntity<List<ParticipantInformation>> queryParticipants(String name, String version, UUID requestId) { - List<ParticipantInformation> participantInformationList = acmParticipantProvider.getAllParticipants(); + var participantInformationList = acmParticipantProvider.getAllParticipants(); return ResponseEntity.ok().body(participantInformationList); } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java index 13382e0fb..62ba7b017 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation. + * Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,12 +20,10 @@ package org.onap.policy.clamp.acm.runtime.participants; -import jakarta.ws.rs.core.Response; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import lombok.RequiredArgsConstructor; import org.apache.commons.collections4.MapUtils; @@ -33,9 +31,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantStatusReqPu import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement; import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState; import org.onap.policy.clamp.models.acm.concepts.ParticipantInformation; -import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; -import org.onap.policy.models.base.PfModelRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -94,12 +90,11 @@ public class AcmParticipantProvider { * @param participantId The UUID of the participant to send request to */ public void sendParticipantStatusRequest(UUID participantId) { - var participant = this.participantProvider.getParticipantById(participantId); + // check if participant is present + this.participantProvider.getParticipantById(participantId); LOGGER.debug("Requesting Participant Status Now ParticipantStatusReq"); participantStatusReqPublisher.send(participantId); - participant.setParticipantState(ParticipantState.OFF_LINE); - participantProvider.updateParticipant(participant); } /** @@ -110,22 +105,6 @@ public class AcmParticipantProvider { this.participantStatusReqPublisher.send((UUID) null); } - /** - * Verify Participant state. - * - * @param participantIds The list of UUIDs of the participants to get - * @throws PfModelRuntimeException in case the participant is offline - */ - public void verifyParticipantState(Set<UUID> participantIds) { - for (UUID participantId : participantIds) { - var participant = this.participantProvider.getParticipantById(participantId); - if (! participant.getParticipantState().equals(ParticipantState.ON_LINE)) { - throw new PfModelRuntimeException(Response.Status.CONFLICT, - "Participant: " + participantId + " is OFFLINE"); - } - } - } - private Map<UUID, AutomationCompositionElement> getAutomationCompositionElementsForParticipant(UUID participantId) { var automationCompositionElements = participantProvider .getAutomationCompositionElements(participantId); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java index 802c6603b..4f564478f 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java @@ -23,27 +23,30 @@ package org.onap.policy.clamp.acm.runtime.supervision; import io.micrometer.core.annotation.Timed; import io.opentelemetry.context.Context; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import lombok.AllArgsConstructor; import org.onap.policy.clamp.acm.runtime.supervision.comm.AcElementPropertiesPublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.AcPreparePublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; -import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement; import org.onap.policy.clamp.models.acm.concepts.DeployState; import org.onap.policy.clamp.models.acm.concepts.LockState; import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; +import org.onap.policy.clamp.models.acm.concepts.SubState; import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck; +import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; import org.onap.policy.clamp.models.acm.utils.AcmUtils; +import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -58,12 +61,15 @@ public class SupervisionAcHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAcHandler.class); private final AutomationCompositionProvider automationCompositionProvider; + private final AcDefinitionProvider acDefinitionProvider; // Publishers for participant communication private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher; private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher; private final AcElementPropertiesPublisher acElementPropertiesPublisher; private final AutomationCompositionMigrationPublisher acCompositionMigrationPublisher; + private final ParticipantSyncPublisher participantSyncPublisher; + private final AcPreparePublisher acPreparePublisher; private final ExecutorService executor = Context.taskWrapping(Executors.newFixedThreadPool(1)); @@ -146,6 +152,30 @@ public class SupervisionAcHandler { } /** + * Handle prepare Pre Deploy an AutomationComposition instance. + * + * @param automationComposition the AutomationComposition + */ + public void prepare(AutomationComposition automationComposition) { + AcmUtils.setCascadedState(automationComposition, DeployState.UNDEPLOYED, LockState.NONE, SubState.PREPARING); + automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR); + automationCompositionProvider.updateAutomationComposition(automationComposition); + executor.execute(() -> acPreparePublisher.sendPrepare(automationComposition)); + } + + /** + * Handle prepare Post Deploy an AutomationComposition instance. + * + * @param automationComposition the AutomationComposition + */ + public void review(AutomationComposition automationComposition) { + AcmUtils.setCascadedState(automationComposition, DeployState.DEPLOYED, LockState.LOCKED, SubState.REVIEWING); + automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR); + automationCompositionProvider.updateAutomationComposition(automationComposition); + executor.execute(() -> acPreparePublisher.sendRevew(automationComposition)); + } + + /** * Handle Lock an AutomationComposition instance. * * @param automationComposition the AutomationComposition @@ -227,10 +257,14 @@ public class SupervisionAcHandler { } private void setAcElementStateInDb(AutomationCompositionDeployAck automationCompositionAckMessage) { + if (!validateMessage(automationCompositionAckMessage)) { + return; + } + var automationCompositionOpt = automationCompositionProvider .findAutomationComposition(automationCompositionAckMessage.getAutomationCompositionId()); if (automationCompositionOpt.isEmpty()) { - LOGGER.warn("AutomationComposition not found in database {}", + LOGGER.error("AutomationComposition not found in database {}", automationCompositionAckMessage.getAutomationCompositionId()); return; } @@ -239,14 +273,7 @@ public class SupervisionAcHandler { if (automationCompositionAckMessage.getAutomationCompositionResultMap() == null || automationCompositionAckMessage.getAutomationCompositionResultMap().isEmpty()) { if (DeployState.DELETING.equals(automationComposition.getDeployState())) { - // scenario when Automation Composition instance has never been deployed - for (var element : automationComposition.getElements().values()) { - if (element.getParticipantId().equals(automationCompositionAckMessage.getParticipantId())) { - element.setDeployState(DeployState.DELETED); - automationCompositionProvider.updateAutomationCompositionElement(element, - automationComposition.getInstanceId()); - } - } + deleteAcInstance(automationComposition, automationCompositionAckMessage.getParticipantId()); } else { LOGGER.warn("Empty AutomationCompositionResultMap {} {}", automationCompositionAckMessage.getAutomationCompositionId(), @@ -257,15 +284,51 @@ public class SupervisionAcHandler { var updated = updateState(automationComposition, automationCompositionAckMessage.getAutomationCompositionResultMap().entrySet(), - automationCompositionAckMessage.getStateChangeResult()); + automationCompositionAckMessage.getStateChangeResult(), automationCompositionAckMessage.getStage()); if (updated) { - automationCompositionProvider.updateAutomationComposition(automationComposition); + automationComposition = automationCompositionProvider.updateAcState(automationComposition); + var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()); + participantSyncPublisher.sendSync(acDefinition.getServiceTemplate(), automationComposition); + } + } + + private boolean validateMessage(AutomationCompositionDeployAck acAckMessage) { + if (acAckMessage.getAutomationCompositionId() == null + || acAckMessage.getStateChangeResult() == null) { + LOGGER.error("Not valid AutomationCompositionDeployAck message"); + return false; + } + if (!StateChangeResult.NO_ERROR.equals(acAckMessage.getStateChangeResult()) + && !StateChangeResult.FAILED.equals(acAckMessage.getStateChangeResult())) { + LOGGER.error("Not valid AutomationCompositionDeployAck message, stateChangeResult is not valid {} ", + acAckMessage.getStateChangeResult()); + return false; + } + + if (acAckMessage.getStage() == null) { + for (var el : acAckMessage.getAutomationCompositionResultMap().values()) { + if (AcmUtils.isInTransitionalState(el.getDeployState(), el.getLockState(), SubState.NONE)) { + LOGGER.error("Not valid AutomationCompositionDeployAck message, states are not valid"); + return false; + } + } + } + return true; + } + + private void deleteAcInstance(AutomationComposition automationComposition, UUID participantId) { + // scenario when Automation Composition instance has never been deployed + for (var element : automationComposition.getElements().values()) { + if (element.getParticipantId().equals(participantId)) { + element.setDeployState(DeployState.DELETED); + automationCompositionProvider.updateAutomationCompositionElement(element); + } } } private boolean updateState(AutomationComposition automationComposition, Set<Map.Entry<UUID, AcElementDeployAck>> automationCompositionResultSet, - StateChangeResult stateChangeResult) { + StateChangeResult stateChangeResult, Integer stage) { var updated = false; boolean inProgress = !StateChangeResult.FAILED.equals(automationComposition.getStateChangeResult()); if (inProgress && !stateChangeResult.equals(automationComposition.getStateChangeResult())) { @@ -276,24 +339,14 @@ public class SupervisionAcHandler { for (var acElementAck : automationCompositionResultSet) { var element = automationComposition.getElements().get(acElementAck.getKey()); if (element != null) { - element.setMessage(acElementAck.getValue().getMessage()); - element.setOutProperties(acElementAck.getValue().getOutProperties()); - element.setOperationalState(acElementAck.getValue().getOperationalState()); - element.setUseState(acElementAck.getValue().getUseState()); + element.setMessage(AcmUtils.validatedMessage(acElementAck.getValue().getMessage())); + if (stage == null) { + element.setSubState(SubState.NONE); + } element.setDeployState(acElementAck.getValue().getDeployState()); element.setLockState(acElementAck.getValue().getLockState()); - element.setRestarting(null); - automationCompositionProvider.updateAutomationCompositionElement(element, - automationComposition.getInstanceId()); - } - } - - if (automationComposition.getRestarting() != null) { - var restarting = automationComposition.getElements().values().stream() - .map(AutomationCompositionElement::getRestarting).filter(Objects::nonNull).findAny(); - if (restarting.isEmpty()) { - automationComposition.setRestarting(null); - updated = true; + element.setStage(stage); + automationCompositionProvider.updateAutomationCompositionElement(element); } } @@ -304,12 +357,22 @@ public class SupervisionAcHandler { * Handle Migration of an AutomationComposition instance to other ACM Definition. * * @param automationComposition the AutomationComposition - * @param compositionTargetId the ACM Definition Id + * @param serviceTemplate the ServiceTemplate */ - public void migrate(AutomationComposition automationComposition, UUID compositionTargetId) { + public void migrate(AutomationComposition automationComposition, ToscaServiceTemplate serviceTemplate) { AcmUtils.setCascadedState(automationComposition, DeployState.MIGRATING, LockState.LOCKED); + var stage = ParticipantUtils.getFirstStage(automationComposition, serviceTemplate); automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR); - executor.execute( - () -> acCompositionMigrationPublisher.send(automationComposition, compositionTargetId)); + automationComposition.setPhase(stage); + executor.execute(() -> acCompositionMigrationPublisher.send(automationComposition, stage)); + } + + /** + * Handle Migration precheck of an AutomationComposition instance to other ACM Definition. + * + * @param automationComposition the AutomationComposition + */ + public void migratePrecheck(AutomationComposition automationComposition) { + executor.execute(() -> acCompositionMigrationPublisher.send(automationComposition, 0)); } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java index 8f3a4c2eb..9ef979f8e 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java @@ -41,9 +41,9 @@ public class SupervisionAspect implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAspect.class); private final SupervisionScanner supervisionScanner; - private final SupervisionPartecipantScanner partecipantScanner; + private final SupervisionParticipantScanner participantScanner; - private ThreadPoolExecutor executor = + private final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); @Scheduled( @@ -56,7 +56,7 @@ public class SupervisionAspect implements Closeable { private void executeScan() { supervisionScanner.run(); - partecipantScanner.run(); + participantScanner.run(); } /** diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java index 963e4830e..f13f5da2c 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java @@ -23,12 +23,14 @@ package org.onap.policy.clamp.acm.runtime.supervision; import io.micrometer.core.annotation.Timed; import lombok.AllArgsConstructor; -import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; +import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck; import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; +import org.onap.policy.clamp.models.acm.utils.AcmUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -43,7 +45,7 @@ public class SupervisionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class); private final AcDefinitionProvider acDefinitionProvider; - private final AcRuntimeParameterGroup acRuntimeParameterGroup; + private final ParticipantSyncPublisher participantSyncPublisher; /** * Handle a ParticipantPrimeAck message from a participant. @@ -52,6 +54,22 @@ public class SupervisionHandler { */ @Timed(value = "listener.participant_prime_ack", description = "PARTICIPANT_PRIME_ACK messages received") public void handleParticipantMessage(ParticipantPrimeAck participantPrimeAckMessage) { + if (participantPrimeAckMessage.getCompositionId() == null + || participantPrimeAckMessage.getCompositionState() == null + || participantPrimeAckMessage.getStateChangeResult() == null) { + LOGGER.error("Not valid ParticipantPrimeAck message"); + return; + } + if (AcTypeState.PRIMING.equals(participantPrimeAckMessage.getCompositionState()) + || AcTypeState.DEPRIMING.equals(participantPrimeAckMessage.getCompositionState())) { + LOGGER.error("Not valid state {}", participantPrimeAckMessage.getCompositionState()); + return; + } + if (!StateChangeResult.NO_ERROR.equals(participantPrimeAckMessage.getStateChangeResult()) + && !StateChangeResult.FAILED.equals(participantPrimeAckMessage.getStateChangeResult())) { + LOGGER.error("Vot valid stateChangeResult {} ", participantPrimeAckMessage.getStateChangeResult()); + return; + } var acDefinitionOpt = acDefinitionProvider.findAcDefinition(participantPrimeAckMessage.getCompositionId()); if (acDefinitionOpt.isEmpty()) { LOGGER.warn("AC Definition not found in database {}", participantPrimeAckMessage.getCompositionId()); @@ -59,7 +77,7 @@ public class SupervisionHandler { } var acDefinition = acDefinitionOpt.get(); if (!AcTypeState.PRIMING.equals(acDefinition.getState()) - && !AcTypeState.DEPRIMING.equals(acDefinition.getState()) && acDefinition.getRestarting() == null) { + && !AcTypeState.DEPRIMING.equals(acDefinition.getState())) { LOGGER.error("AC Definition {} already primed/deprimed with participant {}", participantPrimeAckMessage.getCompositionId(), participantPrimeAckMessage.getParticipantId()); return; @@ -80,20 +98,11 @@ public class SupervisionHandler { } boolean completed = true; - boolean restarting = false; for (var element : acDefinition.getElementStateMap().values()) { - if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) { - element.setMessage(participantPrimeAckMessage.getMessage()); - element.setState(participantPrimeAckMessage.getCompositionState()); - element.setRestarting(null); - acDefinitionProvider.updateAcDefinitionElement(element, acDefinition.getCompositionId()); - } + handlePrimeAckElement(participantPrimeAckMessage, element); if (!finalState.equals(element.getState())) { completed = false; } - if (element.getRestarting() != null) { - restarting = true; - } } if (inProgress && !msgInErrors && completed) { @@ -103,13 +112,20 @@ public class SupervisionHandler { acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR); } } - if (!restarting && acDefinition.getRestarting() != null) { - toUpdate = true; - acDefinition.setRestarting(null); - } if (toUpdate) { acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(), - acDefinition.getStateChangeResult(), acDefinition.getRestarting()); + acDefinition.getStateChangeResult()); + if (!participantPrimeAckMessage.getParticipantId().equals(participantPrimeAckMessage.getReplicaId())) { + participantSyncPublisher.sendSync(acDefinition, participantPrimeAckMessage.getReplicaId()); + } + } + } + + private void handlePrimeAckElement(ParticipantPrimeAck participantPrimeAckMessage, NodeTemplateState element) { + if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) { + element.setMessage(AcmUtils.validatedMessage(participantPrimeAckMessage.getMessage())); + element.setState(participantPrimeAckMessage.getCompositionState()); + acDefinitionProvider.updateAcDefinitionElement(element, participantPrimeAckMessage.getCompositionId()); } } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java index d1efb6ac0..3eb471609 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java @@ -21,7 +21,6 @@ package org.onap.policy.clamp.acm.runtime.supervision; import io.micrometer.core.annotation.Timed; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,15 +30,16 @@ import org.apache.commons.collections4.MapUtils; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher; -import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; +import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionInfo; import org.onap.policy.clamp.models.acm.concepts.Participant; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; +import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica; import org.onap.policy.clamp.models.acm.concepts.ParticipantState; import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType; -import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus; @@ -64,7 +64,7 @@ public class SupervisionParticipantHandler { private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher; private final AutomationCompositionProvider automationCompositionProvider; private final AcDefinitionProvider acDefinitionProvider; - private final ParticipantRestartPublisher participantRestartPublisher; + private final ParticipantSyncPublisher participantSyncPublisher; private final AcRuntimeParameterGroup acRuntimeParameterGroup; /** @@ -72,24 +72,14 @@ public class SupervisionParticipantHandler { * * @param participantRegisterMsg the ParticipantRegister message received from a participant */ - @MessageIntercept @Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received") public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) { - var participantOpt = participantProvider.findParticipant(participantRegisterMsg.getParticipantId()); - - if (participantOpt.isPresent()) { - var participant = participantOpt.get(); - checkOnline(participant); - handleRestart(participant.getParticipantId()); - } else { - var participant = createParticipant(participantRegisterMsg.getParticipantId(), - listToMap(participantRegisterMsg.getParticipantSupportedElementType())); - participantProvider.saveParticipant(participant); - - } + saveIfNotPresent(participantRegisterMsg.getReplicaId(), + participantRegisterMsg.getParticipantId(), + participantRegisterMsg.getParticipantSupportedElementType(), true); participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(), - participantRegisterMsg.getParticipantId()); + participantRegisterMsg.getParticipantId(), participantRegisterMsg.getReplicaId()); } /** @@ -97,15 +87,13 @@ public class SupervisionParticipantHandler { * * @param participantDeregisterMsg the ParticipantDeregister message received from a participant */ - @MessageIntercept @Timed(value = "listener.participant_deregister", description = "PARTICIPANT_DEREGISTER messages received") public void handleParticipantMessage(ParticipantDeregister participantDeregisterMsg) { - var participantOpt = participantProvider.findParticipant(participantDeregisterMsg.getParticipantId()); - - if (participantOpt.isPresent()) { - var participant = participantOpt.get(); - participant.setParticipantState(ParticipantState.OFF_LINE); - participantProvider.updateParticipant(participant); + var replicaId = participantDeregisterMsg.getReplicaId() != null + ? participantDeregisterMsg.getReplicaId() : participantDeregisterMsg.getParticipantId(); + var replicaOpt = participantProvider.findParticipantReplica(replicaId); + if (replicaOpt.isPresent()) { + participantProvider.deleteParticipantReplica(replicaId); } participantDeregisterAckPublisher.send(participantDeregisterMsg.getMessageId()); @@ -116,32 +104,68 @@ public class SupervisionParticipantHandler { * * @param participantStatusMsg the ParticipantStatus message received from a participant */ - @MessageIntercept @Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received") public void handleParticipantMessage(ParticipantStatus participantStatusMsg) { + saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(), + participantStatusMsg.getParticipantSupportedElementType(), false); - var participantOpt = participantProvider.findParticipant(participantStatusMsg.getParticipantId()); - if (participantOpt.isEmpty()) { - var participant = createParticipant(participantStatusMsg.getParticipantId(), - listToMap(participantStatusMsg.getParticipantSupportedElementType())); - participantProvider.saveParticipant(participant); - } else { - checkOnline(participantOpt.get()); - } if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) { - automationCompositionProvider.upgradeStates(participantStatusMsg.getAutomationCompositionInfoList()); + updateAcOutProperties(participantStatusMsg.getAutomationCompositionInfoList(), + participantStatusMsg.getCompositionId()); } if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty() && participantStatusMsg.getCompositionId() != null) { updateAcDefinitionOutProperties(participantStatusMsg.getCompositionId(), - participantStatusMsg.getParticipantDefinitionUpdates()); + participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantDefinitionUpdates()); } } - private void updateAcDefinitionOutProperties(UUID composotionId, List<ParticipantDefinition> list) { - var acDefinitionOpt = acDefinitionProvider.findAcDefinition(composotionId); + private void saveIfNotPresent(UUID msgReplicaId, UUID participantId, + List<ParticipantSupportedElementType> participantSupportedElementType, boolean registration) { + var replicaId = msgReplicaId != null ? msgReplicaId : participantId; + var replicaOpt = participantProvider.findParticipantReplica(replicaId); + if (replicaOpt.isPresent()) { + var replica = replicaOpt.get(); + checkOnline(replica); + } else { + var participant = getParticipant(participantId, listToMap(participantSupportedElementType)); + participant.getReplicas().put(replicaId, createReplica(replicaId)); + participantProvider.saveParticipant(participant); + } + if (registration) { + handleRestart(participantId, replicaId); + } + } + + private Participant getParticipant(UUID participantId, + Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) { + var participantOpt = participantProvider.findParticipant(participantId); + return participantOpt.orElseGet(() -> createParticipant(participantId, participantSupportedElementType)); + } + + private ParticipantReplica createReplica(UUID replicaId) { + var replica = new ParticipantReplica(); + replica.setReplicaId(replicaId); + replica.setParticipantState(ParticipantState.ON_LINE); + replica.setLastMsg(TimestampHelper.now()); + return replica; + + } + + private void updateAcOutProperties(List<AutomationCompositionInfo> automationCompositionInfoList, + UUID compositionId) { + automationCompositionProvider.upgradeStates(automationCompositionInfoList); + var acDefinition = acDefinitionProvider.getAcDefinition(compositionId); + for (var acInfo : automationCompositionInfoList) { + var ac = automationCompositionProvider.getAutomationComposition(acInfo.getAutomationCompositionId()); + participantSyncPublisher.sendSync(acDefinition.getServiceTemplate(), ac); + } + } + + private void updateAcDefinitionOutProperties(UUID compositionId, UUID replicaId, List<ParticipantDefinition> list) { + var acDefinitionOpt = acDefinitionProvider.findAcDefinition(compositionId); if (acDefinitionOpt.isEmpty()) { - LOGGER.error("Ac Definition with id {} not found", composotionId); + LOGGER.error("Ac Definition with id {} not found", compositionId); return; } var acDefinition = acDefinitionOpt.get(); @@ -155,71 +179,47 @@ public class SupervisionParticipantHandler { } acDefinitionProvider.updateAcDefinition(acDefinition, acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName()); + participantSyncPublisher.sendSync(acDefinition, replicaId); } - private void checkOnline(Participant participant) { - if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) { - participant.setParticipantState(ParticipantState.ON_LINE); + private void checkOnline(ParticipantReplica replica) { + if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) { + replica.setParticipantState(ParticipantState.ON_LINE); } - participant.setLastMsg(TimestampHelper.now()); - participantProvider.saveParticipant(participant); + replica.setLastMsg(TimestampHelper.now()); + participantProvider.saveParticipantReplica(replica); } - private void handleRestart(UUID participantId) { + private void handleRestart(UUID participantId, UUID replicaId) { var compositionIds = participantProvider.getCompositionIds(participantId); for (var compositionId : compositionIds) { var acDefinition = acDefinitionProvider.getAcDefinition(compositionId); LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId()); - handleRestart(participantId, acDefinition); + handleSyncRestart(participantId, replicaId, acDefinition); } } - private void handleRestart(UUID participantId, AutomationCompositionDefinition acDefinition) { + private void handleSyncRestart(final UUID participantId, UUID replicaId, + AutomationCompositionDefinition acDefinition) { if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId()); return; } LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId()); - for (var elementState : acDefinition.getElementStateMap().values()) { - if (participantId.equals(elementState.getParticipantId())) { - elementState.setRestarting(true); - } - } var automationCompositionList = automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId()); - List<AutomationComposition> automationCompositions = new ArrayList<>(); - for (var automationComposition : automationCompositionList) { - if (isAcToBeRestarted(participantId, automationComposition)) { - automationCompositions.add(automationComposition); - } - } - // expected final state - if (StateChangeResult.TIMEOUT.equals(acDefinition.getStateChangeResult())) { - acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR); - } - acDefinition.setRestarting(true); - acDefinitionProvider.updateAcDefinition(acDefinition, - acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName()); - participantRestartPublisher.send(participantId, acDefinition, automationCompositions); + var automationCompositions = automationCompositionList.stream() + .filter(ac -> isAcToBeSyncRestarted(participantId, ac)).toList(); + participantSyncPublisher.sendRestartMsg(participantId, replicaId, acDefinition, automationCompositions); } - private boolean isAcToBeRestarted(UUID participantId, AutomationComposition automationComposition) { - boolean toAdd = false; + private boolean isAcToBeSyncRestarted(UUID participantId, AutomationComposition automationComposition) { for (var element : automationComposition.getElements().values()) { if (participantId.equals(element.getParticipantId())) { - element.setRestarting(true); - toAdd = true; - } - } - if (toAdd) { - automationComposition.setRestarting(true); - // expected final state - if (StateChangeResult.TIMEOUT.equals(automationComposition.getStateChangeResult())) { - automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR); + return true; } - automationCompositionProvider.updateAutomationComposition(automationComposition); } - return toAdd; + return false; } private Participant createParticipant(UUID participantId, @@ -227,8 +227,6 @@ public class SupervisionParticipantHandler { var participant = new Participant(); participant.setParticipantId(participantId); participant.setParticipantSupportedElementTypes(participantSupportedElementType); - participant.setParticipantState(ParticipantState.ON_LINE); - participant.setLastMsg(TimestampHelper.now()); return participant; } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java index 4d2a22f26..4ada199b6 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java @@ -21,8 +21,7 @@ package org.onap.policy.clamp.acm.runtime.supervision; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.models.acm.concepts.Participant; -import org.onap.policy.clamp.models.acm.concepts.ParticipantState; +import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica; import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider; import org.onap.policy.clamp.models.acm.utils.TimestampHelper; import org.slf4j.Logger; @@ -33,20 +32,20 @@ import org.springframework.stereotype.Component; * This class is used to scan the automation compositions in the database and check if they are in the correct state. */ @Component -public class SupervisionPartecipantScanner { - private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionPartecipantScanner.class); +public class SupervisionParticipantScanner { + private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionParticipantScanner.class); private final long maxWaitMs; private final ParticipantProvider participantProvider; /** - * Constructor for instantiating SupervisionPartecipantScanner. + * Constructor for instantiating SupervisionParticipantScanner. * * @param participantProvider the Participant Provider * @param acRuntimeParameterGroup the parameters for the automation composition runtime */ - public SupervisionPartecipantScanner(final ParticipantProvider participantProvider, + public SupervisionParticipantScanner(final ParticipantProvider participantProvider, final AcRuntimeParameterGroup acRuntimeParameterGroup) { this.participantProvider = participantProvider; this.maxWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs(); @@ -56,27 +55,17 @@ public class SupervisionPartecipantScanner { * Run Scanning. */ public void run() { - LOGGER.debug("Scanning participans in the database . . ."); - - for (var participant : participantProvider.getParticipants()) { - scanParticipantStatus(participant); - } - - LOGGER.debug("Participans scan complete . . ."); + LOGGER.debug("Scanning participants in the database . . ."); + participantProvider.findReplicasOnLine().forEach(this::scanParticipantReplicaStatus); + LOGGER.debug("Participants scan complete . . ."); } - private void scanParticipantStatus(Participant participant) { - var id = participant.getParticipantId(); - if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) { - LOGGER.debug("report Participant is still OFF_LINE {}", id); - return; - } + private void scanParticipantReplicaStatus(ParticipantReplica replica) { var now = TimestampHelper.nowEpochMilli(); - var lastMsg = TimestampHelper.toEpochMilli(participant.getLastMsg()); + var lastMsg = TimestampHelper.toEpochMilli(replica.getLastMsg()); if ((now - lastMsg) > maxWaitMs) { - LOGGER.debug("report Participant OFF_LINE {}", id); - participant.setParticipantState(ParticipantState.OFF_LINE); - participantProvider.updateParticipant(participant); + LOGGER.debug("Participant OFF_LINE {}", replica.getReplicaId()); + participantProvider.deleteParticipantReplica(replica.getReplicaId()); } } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java index 06d464671..db67e5eea 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java @@ -22,17 +22,21 @@ package org.onap.policy.clamp.acm.runtime.supervision; +import java.util.Comparator; import java.util.HashMap; import java.util.UUID; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; +import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.DeployState; import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; +import org.onap.policy.clamp.models.acm.concepts.SubState; import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider; import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider; import org.onap.policy.clamp.models.acm.utils.AcmUtils; @@ -55,6 +59,8 @@ public class SupervisionScanner { private final AcDefinitionProvider acDefinitionProvider; private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher; private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher; + private final ParticipantSyncPublisher participantSyncPublisher; + private final AutomationCompositionMigrationPublisher automationCompositionMigrationPublisher; /** * Constructor for instantiating SupervisionScanner. @@ -69,11 +75,15 @@ public class SupervisionScanner { final AcDefinitionProvider acDefinitionProvider, final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher, final AutomationCompositionDeployPublisher automationCompositionDeployPublisher, + final ParticipantSyncPublisher participantSyncPublisher, + final AutomationCompositionMigrationPublisher automationCompositionMigrationPublisher, final AcRuntimeParameterGroup acRuntimeParameterGroup) { this.automationCompositionProvider = automationCompositionProvider; this.acDefinitionProvider = acDefinitionProvider; this.automationCompositionStateChangePublisher = automationCompositionStateChangePublisher; this.automationCompositionDeployPublisher = automationCompositionDeployPublisher; + this.participantSyncPublisher = participantSyncPublisher; + this.automationCompositionMigrationPublisher = automationCompositionMigrationPublisher; this.maxStatusWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs(); } @@ -91,11 +101,9 @@ public class SupervisionScanner { var acList = automationCompositionProvider.getAcInstancesInTransition(); HashMap<UUID, AutomationCompositionDefinition> acDefinitionMap = new HashMap<>(); for (var automationComposition : acList) { - var acDefinition = acDefinitionMap.get(automationComposition.getCompositionId()); - if (acDefinition == null) { - acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()); - acDefinitionMap.put(acDefinition.getCompositionId(), acDefinition); - } + var compositionId = automationComposition.getCompositionTargetId() != null + ? automationComposition.getCompositionTargetId() : automationComposition.getCompositionId(); + var acDefinition = acDefinitionMap.computeIfAbsent(compositionId, acDefinitionProvider::getAcDefinition); scanAutomationComposition(automationComposition, acDefinition.getServiceTemplate()); } LOGGER.debug("Automation composition scan complete . . ."); @@ -113,11 +121,13 @@ public class SupervisionScanner { for (var element : acDefinition.getElementStateMap().values()) { if (!finalState.equals(element.getState())) { completed = false; + break; } } if (completed) { acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), finalState, - StateChangeResult.NO_ERROR, null); + StateChangeResult.NO_ERROR); + participantSyncPublisher.sendSync(acDefinition, null); } else { handleTimeout(acDefinition); } @@ -128,14 +138,33 @@ public class SupervisionScanner { LOGGER.debug("scanning automation composition {} . . .", automationComposition.getInstanceId()); if (!AcmUtils.isInTransitionalState(automationComposition.getDeployState(), - automationComposition.getLockState()) + automationComposition.getLockState(), automationComposition.getSubState()) || StateChangeResult.FAILED.equals(automationComposition.getStateChangeResult())) { LOGGER.debug("automation composition {} scanned, OK", automationComposition.getInstanceId()); - // Clear Timeout on automation composition return; } + if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) { + scanStage(automationComposition, serviceTemplate); + } else if (DeployState.UPDATING.equals(automationComposition.getDeployState()) + || SubState.PREPARING.equals(automationComposition.getSubState()) + || SubState.REVIEWING.equals(automationComposition.getSubState()) + || SubState.MIGRATION_PRECHECKING.equals(automationComposition.getSubState())) { + simpleScan(automationComposition, serviceTemplate); + } else { + scanWithPhase(automationComposition, serviceTemplate); + } + } + + /** + * Scan with startPhase: DEPLOY, UNDEPLOY, LOCK and UNLOCK. + * + * @param automationComposition the AutomationComposition + * @param serviceTemplate the ToscaServiceTemplate + */ + private void scanWithPhase(final AutomationComposition automationComposition, + ToscaServiceTemplate serviceTemplate) { var completed = true; var minSpNotCompleted = 1000; // min startPhase not completed var maxSpNotCompleted = 0; // max startPhase not completed @@ -147,7 +176,8 @@ public class SupervisionScanner { int startPhase = ParticipantUtils.findStartPhase(toscaNodeTemplate.getProperties()); defaultMin = Math.min(defaultMin, startPhase); defaultMax = Math.max(defaultMax, startPhase); - if (AcmUtils.isInTransitionalState(element.getDeployState(), element.getLockState())) { + if (AcmUtils.isInTransitionalState(element.getDeployState(), element.getLockState(), + element.getSubState())) { completed = false; minSpNotCompleted = Math.min(minSpNotCompleted, startPhase); maxSpNotCompleted = Math.max(maxSpNotCompleted, startPhase); @@ -155,35 +185,86 @@ public class SupervisionScanner { } if (completed) { - LOGGER.debug("automation composition scan: transition state {} {} completed", - automationComposition.getDeployState(), automationComposition.getLockState()); - - complete(automationComposition); + complete(automationComposition, serviceTemplate); } else { LOGGER.debug("automation composition scan: transition state {} {} not completed", automationComposition.getDeployState(), automationComposition.getLockState()); - if (DeployState.UPDATING.equals(automationComposition.getDeployState()) - || DeployState.MIGRATING.equals(automationComposition.getDeployState())) { - // UPDATING do not need phases - handleTimeoutUpdate(automationComposition); - return; - } - var isForward = AcmUtils.isForward(automationComposition.getDeployState(), automationComposition.getLockState()); var nextSpNotCompleted = isForward ? minSpNotCompleted : maxSpNotCompleted; if (nextSpNotCompleted != automationComposition.getPhase()) { - sendAutomationCompositionMsg(automationComposition, serviceTemplate, nextSpNotCompleted, false); + sendAutomationCompositionMsg(automationComposition, serviceTemplate, nextSpNotCompleted); + } else { + handleTimeout(automationComposition, serviceTemplate); + } + } + } + + /** + * Simple scan: UPDATE, PREPARE, REVIEW, MIGRATE_PRECHECKING. + * + * @param automationComposition the AutomationComposition + * @param serviceTemplate the ToscaServiceTemplate + */ + private void simpleScan(final AutomationComposition automationComposition, ToscaServiceTemplate serviceTemplate) { + var completed = automationComposition.getElements().values().stream() + .filter(element -> AcmUtils.isInTransitionalState(element.getDeployState(), element.getLockState(), + element.getSubState())).findFirst().isEmpty(); + + if (completed) { + complete(automationComposition, serviceTemplate); + } else { + handleTimeout(automationComposition, serviceTemplate); + } + } + + /** + * Scan with stage: MIGRATE. + * + * @param automationComposition the AutomationComposition + * @param serviceTemplate the ToscaServiceTemplate + */ + private void scanStage(final AutomationComposition automationComposition, ToscaServiceTemplate serviceTemplate) { + var completed = true; + var minStageNotCompleted = 1000; // min stage not completed + for (var element : automationComposition.getElements().values()) { + if (AcmUtils.isInTransitionalState(element.getDeployState(), element.getLockState(), + element.getSubState())) { + var toscaNodeTemplate = serviceTemplate.getToscaTopologyTemplate().getNodeTemplates() + .get(element.getDefinition().getName()); + var stageSet = ParticipantUtils.findStageSet(toscaNodeTemplate.getProperties()); + var minStage = stageSet.stream().min(Comparator.comparing(Integer::valueOf)).orElse(0); + int stage = element.getStage() != null ? element.getStage() : minStage; + minStageNotCompleted = Math.min(minStageNotCompleted, stage); + completed = false; + } + } + + if (completed) { + complete(automationComposition, serviceTemplate); + } else { + LOGGER.debug("automation composition scan: transition from state {} to {} not completed", + automationComposition.getDeployState(), automationComposition.getLockState()); + + if (minStageNotCompleted != automationComposition.getPhase()) { + savePhase(automationComposition, minStageNotCompleted); + LOGGER.debug("retry message AutomationCompositionMigration"); + automationCompositionMigrationPublisher.send(automationComposition, minStageNotCompleted); } else { - handleTimeoutWithPhase(automationComposition, serviceTemplate); + handleTimeout(automationComposition, serviceTemplate); } } } - private void complete(final AutomationComposition automationComposition) { + private void complete(final AutomationComposition automationComposition, + ToscaServiceTemplate serviceTemplate) { + LOGGER.debug("automation composition scan: transition state {} {} {} completed", + automationComposition.getDeployState(), automationComposition.getLockState(), + automationComposition.getSubState()); + var deployState = automationComposition.getDeployState(); if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) { // migration scenario @@ -193,14 +274,18 @@ public class SupervisionScanner { automationComposition.setDeployState(AcmUtils.deployCompleted(deployState)); automationComposition.setLockState(AcmUtils.lockCompleted(deployState, automationComposition.getLockState())); automationComposition.setPhase(null); + automationComposition.setSubState(SubState.NONE); + automationComposition.setPrecheck(null); if (StateChangeResult.TIMEOUT.equals(automationComposition.getStateChangeResult())) { automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR); } + var acToUpdate = automationComposition; if (DeployState.DELETED.equals(automationComposition.getDeployState())) { automationCompositionProvider.deleteAutomationComposition(automationComposition.getInstanceId()); } else { - automationCompositionProvider.updateAutomationComposition(automationComposition); + acToUpdate = automationCompositionProvider.updateAcState(acToUpdate); } + participantSyncPublisher.sendSync(serviceTemplate, acToUpdate); } private void handleTimeout(AutomationCompositionDefinition acDefinition) { @@ -214,71 +299,48 @@ public class SupervisionScanner { LOGGER.debug("Report timeout for the ac definition {}", acDefinition.getCompositionId()); acDefinition.setStateChangeResult(StateChangeResult.TIMEOUT); acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), - acDefinition.getState(), acDefinition.getStateChangeResult(), acDefinition.getRestarting()); - } - } - - private void handleTimeoutUpdate(AutomationComposition automationComposition) { - if (StateChangeResult.TIMEOUT.equals(automationComposition.getStateChangeResult())) { - LOGGER.debug("The ac instance is in timeout {}", automationComposition.getInstanceId()); - return; - } - var now = TimestampHelper.nowEpochMilli(); - var lastMsg = TimestampHelper.toEpochMilli(automationComposition.getLastMsg()); - for (var element : automationComposition.getElements().values()) { - if (!AcmUtils.isInTransitionalState(element.getDeployState(), element.getLockState())) { - continue; - } - if ((now - lastMsg) > maxStatusWaitMs) { - LOGGER.debug("Report timeout for the ac instance {}", automationComposition.getInstanceId()); - automationComposition.setStateChangeResult(StateChangeResult.TIMEOUT); - automationCompositionProvider.updateAutomationComposition(automationComposition); - break; - } + acDefinition.getState(), acDefinition.getStateChangeResult()); + participantSyncPublisher.sendSync(acDefinition, null); } } - private void handleTimeoutWithPhase(AutomationComposition automationComposition, + private void handleTimeout(AutomationComposition automationComposition, ToscaServiceTemplate serviceTemplate) { + LOGGER.debug("automation composition scan: transition from state {} to {} {} not completed", + automationComposition.getDeployState(), automationComposition.getLockState(), + automationComposition.getSubState()); + if (StateChangeResult.TIMEOUT.equals(automationComposition.getStateChangeResult())) { LOGGER.debug("The ac instance is in timeout {}", automationComposition.getInstanceId()); return; } - int currentPhase = automationComposition.getPhase(); var now = TimestampHelper.nowEpochMilli(); var lastMsg = TimestampHelper.toEpochMilli(automationComposition.getLastMsg()); - for (var element : automationComposition.getElements().values()) { - if (!AcmUtils.isInTransitionalState(element.getDeployState(), element.getLockState())) { - continue; - } - var toscaNodeTemplate = serviceTemplate.getToscaTopologyTemplate().getNodeTemplates() - .get(element.getDefinition().getName()); - int startPhase = ParticipantUtils.findStartPhase(toscaNodeTemplate.getProperties()); - if (currentPhase != startPhase) { - continue; - } - if ((now - lastMsg) > maxStatusWaitMs) { - LOGGER.debug("Report timeout for the ac instance {}", automationComposition.getInstanceId()); - automationComposition.setStateChangeResult(StateChangeResult.TIMEOUT); - automationCompositionProvider.updateAutomationComposition(automationComposition); - break; - } + if ((now - lastMsg) > maxStatusWaitMs) { + LOGGER.debug("Report timeout for the ac instance {}", automationComposition.getInstanceId()); + automationComposition.setStateChangeResult(StateChangeResult.TIMEOUT); + automationCompositionProvider.updateAcState(automationComposition); + participantSyncPublisher.sendSync(serviceTemplate, automationComposition); } } - private void sendAutomationCompositionMsg(AutomationComposition automationComposition, - ToscaServiceTemplate serviceTemplate, int startPhase, boolean firstStartPhase) { + private void savePhase(AutomationComposition automationComposition, int startPhase) { automationComposition.setLastMsg(TimestampHelper.now()); automationComposition.setPhase(startPhase); - automationCompositionProvider.updateAutomationComposition(automationComposition); + automationCompositionProvider.updateAcState(automationComposition); + } + + private void sendAutomationCompositionMsg(AutomationComposition automationComposition, + ToscaServiceTemplate serviceTemplate, int startPhase) { + savePhase(automationComposition, startPhase); if (DeployState.DEPLOYING.equals(automationComposition.getDeployState())) { - LOGGER.debug("retry message AutomationCompositionUpdate"); + LOGGER.debug("retry message AutomationCompositionDeploy"); automationCompositionDeployPublisher.send(automationComposition, serviceTemplate, startPhase, - firstStartPhase); + false); } else { LOGGER.debug("retry message AutomationCompositionStateChange"); - automationCompositionStateChangePublisher.send(automationComposition, startPhase, firstStartPhase); + automationCompositionStateChangePublisher.send(automationComposition, startPhase, false); } } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java index 5014f7dc3..19130c225 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java @@ -21,14 +21,11 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm; import jakarta.ws.rs.core.Response.Status; -import java.util.List; -import java.util.Optional; import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher; -import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantAckMessage; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.onap.policy.common.message.bus.event.TopicSink; +import org.onap.policy.common.message.bus.event.client.TopicSinkClient; public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMessage> implements Publisher { diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java index 5afb7eba4..d3222caf5 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java @@ -21,14 +21,11 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm; import jakarta.ws.rs.core.Response.Status; -import java.util.List; -import java.util.Optional; import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher; -import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessage; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.onap.policy.common.message.bus.event.TopicSink; +import org.onap.policy.common.message.bus.event.client.TopicSinkClient; public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> implements Publisher { diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcElementPropertiesPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcElementPropertiesPublisher.java index 338f2960d..cc5d1461f 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcElementPropertiesPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcElementPropertiesPublisher.java @@ -56,7 +56,7 @@ public class AcElementPropertiesPublisher extends AbstractParticipantPublisher<P propertiesUpdate.setParticipantUpdatesList( AcmUtils.createParticipantDeployList(automationComposition, DeployOrder.UPDATE)); - LOGGER.debug("AC Element properties update sent {}", propertiesUpdate); + LOGGER.debug("AC Element properties update sent {}", propertiesUpdate.getMessageId()); super.send(propertiesUpdate); } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcPreparePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcPreparePublisher.java new file mode 100644 index 000000000..acf403595 --- /dev/null +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcPreparePublisher.java @@ -0,0 +1,78 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.runtime.supervision.comm; + +import io.micrometer.core.annotation.Timed; +import java.time.Instant; +import java.util.UUID; +import lombok.AllArgsConstructor; +import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; +import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionPrepare; +import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder; +import org.onap.policy.clamp.models.acm.utils.AcmUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +@AllArgsConstructor +public class AcPreparePublisher extends AbstractParticipantPublisher<AutomationCompositionPrepare> { + + private static final Logger LOGGER = LoggerFactory.getLogger(AcPreparePublisher.class); + + /** + * Send AutomationCompositionPrepare Prepare message to Participant. + * + * @param automationComposition the AutomationComposition + */ + @Timed(value = "publisher.prepare", description = "AC Prepare Pre Deploy published") + public void sendPrepare(AutomationComposition automationComposition) { + var acPrepare = createAutomationCompositionPrepare(automationComposition.getCompositionId(), + automationComposition.getInstanceId()); + acPrepare.setParticipantList( + AcmUtils.createParticipantDeployList(automationComposition, DeployOrder.NONE)); + LOGGER.debug("AC Prepare sent {}", acPrepare); + super.send(acPrepare); + } + + /** + * Send AutomationCompositionPrepare Review message to Participant. + * + * @param automationComposition the AutomationComposition + */ + @Timed(value = "publisher.review", description = "AC Review Post Deploy published") + public void sendRevew(AutomationComposition automationComposition) { + var acPrepare = createAutomationCompositionPrepare(automationComposition.getCompositionId(), + automationComposition.getInstanceId()); + acPrepare.setPreDeploy(false); + LOGGER.debug("AC Review sent {}", acPrepare); + super.send(acPrepare); + } + + private AutomationCompositionPrepare createAutomationCompositionPrepare(UUID compositionId, UUID instanceId) { + var acPrepare = new AutomationCompositionPrepare(); + acPrepare.setCompositionId(compositionId); + acPrepare.setAutomationCompositionId(instanceId); + acPrepare.setMessageId(UUID.randomUUID()); + acPrepare.setTimestamp(Instant.now()); + return acPrepare; + } +} diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionDeployPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionDeployPublisher.java index 2b6435e67..7fe63a7c5 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionDeployPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionDeployPublisher.java @@ -87,7 +87,7 @@ public class AutomationCompositionDeployPublisher extends AbstractParticipantPub acDeployMsg.setTimestamp(Instant.now()); acDeployMsg.setParticipantUpdatesList(participantDeploys); - LOGGER.debug("AutomationCompositionDeploy message sent {}", acDeployMsg); + LOGGER.debug("AutomationCompositionDeploy message sent {}", acDeployMsg.getMessageId()); super.send(acDeployMsg); } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionMigrationPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionMigrationPublisher.java index e26a5403b..572f7b1fe 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionMigrationPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionMigrationPublisher.java @@ -36,20 +36,22 @@ public class AutomationCompositionMigrationPublisher * Send AutomationCompositionMigration message to Participant. * * @param automationComposition the AutomationComposition - * @param compositionTargetId the Composition Definition Target + * @param stage the stage to execute */ @Timed( value = "publisher.automation_composition_migration", description = "AUTOMATION_COMPOSITION_MIGRATION messages published") - public void send(AutomationComposition automationComposition, UUID compositionTargetId) { - var acsc = new AutomationCompositionMigration(); - acsc.setCompositionId(automationComposition.getCompositionId()); - acsc.setAutomationCompositionId(automationComposition.getInstanceId()); - acsc.setMessageId(UUID.randomUUID()); - acsc.setCompositionTargetId(compositionTargetId); - acsc.setParticipantUpdatesList( + public void send(AutomationComposition automationComposition, int stage) { + var acMigration = new AutomationCompositionMigration(); + acMigration.setPrecheck(Boolean.TRUE.equals(automationComposition.getPrecheck())); + acMigration.setCompositionId(automationComposition.getCompositionId()); + acMigration.setAutomationCompositionId(automationComposition.getInstanceId()); + acMigration.setMessageId(UUID.randomUUID()); + acMigration.setCompositionTargetId(automationComposition.getCompositionTargetId()); + acMigration.setStage(stage); + acMigration.setParticipantUpdatesList( AcmUtils.createParticipantDeployList(automationComposition, DeployOrder.MIGRATE)); - super.send(acsc); + super.send(acMigration); } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangeAckListener.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangeAckListener.java index faca6a897..a0b74b574 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangeAckListener.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangeAckListener.java @@ -25,8 +25,8 @@ import org.onap.policy.clamp.acm.runtime.config.messaging.Listener; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler; import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdateAckListener.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdateAckListener.java index aceec0400..551a27c86 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdateAckListener.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdateAckListener.java @@ -25,8 +25,8 @@ import org.onap.policy.clamp.acm.runtime.config.messaging.Listener; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler; import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantDeregisterListener.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantDeregisterListener.java index a4e400263..1649c2543 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantDeregisterListener.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantDeregisterListener.java @@ -25,8 +25,8 @@ import org.onap.policy.clamp.acm.runtime.config.messaging.Listener; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimeAckListener.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimeAckListener.java index 55682d154..a6a3ab648 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimeAckListener.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimeAckListener.java @@ -25,8 +25,8 @@ import org.onap.policy.clamp.acm.runtime.config.messaging.Listener; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionHandler; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java index 89763a2b6..360e526cc 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.UUID; import lombok.AllArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; @@ -55,7 +54,6 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantPrimePublisher.class); private final ParticipantProvider participantProvider; - private final AcmParticipantProvider acmParticipantProvider; private final AcRuntimeParameterGroup acRuntimeParameterGroup; /** @@ -74,7 +72,7 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part message.setParticipantId(participantId); message.setTimestamp(Instant.now()); message.setParticipantDefinitionUpdates(participantDefinitions); - LOGGER.debug("Participant Update sent {}", message); + LOGGER.debug("Participant Update sent {}", message.getMessageId()); super.send(message); } @@ -99,9 +97,7 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey()); elementState.setState(AcTypeState.PRIMING); participantIds.add(elementState.getParticipantId()); - var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(), - elementEntry.getValue().getTypeVersion()); - supportedElementMap.put(type, elementState.getParticipantId()); + supportedElementMap.put(AcmUtils.getType(elementEntry.getValue()), elementState.getParticipantId()); } } else { // scenario Prime participants not assigned yet @@ -109,16 +105,14 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part for (var elementEntry : acElements) { var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey()); elementState.setState(AcTypeState.PRIMING); - var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(), - elementEntry.getValue().getTypeVersion()); - var participantId = supportedElementMap.get(type); + var participantId = supportedElementMap.get(AcmUtils.getType(elementEntry.getValue())); if (participantId != null) { elementState.setParticipantId(participantId); participantIds.add(participantId); } } } - acmParticipantProvider.verifyParticipantState(participantIds); + participantProvider.verifyParticipantState(participantIds); return AcmUtils.prepareParticipantPriming(acElements, supportedElementMap); } @@ -133,7 +127,7 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part // DeCommission the automation composition but deleting participantdefinitions on participants message.setParticipantDefinitionUpdates(null); - LOGGER.debug("Participant Update sent {}", message); + LOGGER.debug("Participant Update sent {}", message.getMessageId()); super.send(message); } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterAckPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterAckPublisher.java index d021c57a4..5e073080b 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterAckPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterAckPublisher.java @@ -36,14 +36,21 @@ public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPubli * * @param responseTo the original request id in the request. * @param participantId the participant Id + * @param replicaId the participant replica Id */ @Timed(value = "publisher.participant_register_ack", description = "PARTICIPANT_REGISTER_ACK messages published") - public void send(UUID responseTo, UUID participantId) { + public void send(UUID responseTo, UUID participantId, UUID replicaId) { var message = new ParticipantRegisterAck(); message.setParticipantId(participantId); + message.setReplicaId(replicaId); message.setResponseTo(responseTo); message.setMessage("Participant Register Ack"); message.setResult(true); super.send(message); } + + @Override + public boolean isDefaultTopic() { + return false; + } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterListener.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterListener.java index 3b7db1c7d..d78cec7db 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterListener.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterListener.java @@ -25,8 +25,8 @@ import org.onap.policy.clamp.acm.runtime.config.messaging.Listener; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java deleted file mode 100644 index 4f28eab8e..000000000 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java +++ /dev/null @@ -1,117 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2023-2024 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.clamp.acm.runtime.supervision.comm; - -import io.micrometer.core.annotation.Timed; -import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.UUID; -import lombok.AllArgsConstructor; -import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; -import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; -import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition; -import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc; -import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart; -import org.onap.policy.clamp.models.acm.utils.AcmUtils; -import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; -import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -@Component -@AllArgsConstructor -public class ParticipantRestartPublisher extends AbstractParticipantPublisher<ParticipantRestart> { - - private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantRestartPublisher.class); - private final AcRuntimeParameterGroup acRuntimeParameterGroup; - - /** - * Send Restart to Participant. - * - * @param participantId the ParticipantId - * @param acmDefinition the AutomationComposition Definition - * @param automationCompositions the list of automationCompositions - */ - @Timed(value = "publisher.participant_restart", description = "Participant Restart published") - public void send(UUID participantId, AutomationCompositionDefinition acmDefinition, - List<AutomationComposition> automationCompositions) { - - var message = new ParticipantRestart(); - message.setParticipantId(participantId); - message.setCompositionId(acmDefinition.getCompositionId()); - message.setMessageId(UUID.randomUUID()); - message.setTimestamp(Instant.now()); - message.setState(acmDefinition.getState()); - message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition)); - var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate()); - - for (var automationComposition : automationCompositions) { - var restartAc = new ParticipantRestartAc(); - restartAc.setAutomationCompositionId(automationComposition.getInstanceId()); - for (var element : automationComposition.getElements().values()) { - if (participantId.equals(element.getParticipantId())) { - var acElementRestart = AcmUtils.createAcElementRestart(element); - acElementRestart.setToscaServiceTemplateFragment(toscaServiceTemplateFragment); - restartAc.getAcElementList().add(acElementRestart); - } - } - message.getAutomationcompositionList().add(restartAc); - } - - LOGGER.debug("Participant Restart sent {}", message); - super.send(message); - } - - protected List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId, - AutomationCompositionDefinition acmDefinition) { - var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(), - acRuntimeParameterGroup.getAcmParameters().getToscaElementName()); - - // list of entry filtered by participantId - List<Entry<String, ToscaNodeTemplate>> elementList = new ArrayList<>(); - Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>(); - for (var elementEntry : acElements) { - var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey()); - if (participantId.equals(elementState.getParticipantId())) { - var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(), - elementEntry.getValue().getTypeVersion()); - supportedElementMap.put(type, participantId); - elementList.add(elementEntry); - } - } - var list = AcmUtils.prepareParticipantPriming(elementList, supportedElementMap); - for (var participantDefinition : list) { - for (var elementDe : participantDefinition.getAutomationCompositionElementDefinitionList()) { - var state = acmDefinition.getElementStateMap().get(elementDe.getAcElementDefinitionId().getName()); - if (state != null) { - elementDe.setOutProperties(state.getOutProperties()); - } - } - } - return list; - } -} diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusListener.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusListener.java index 700e61df3..0a7a5eb93 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusListener.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusListener.java @@ -25,8 +25,8 @@ import org.onap.policy.clamp.acm.runtime.config.messaging.Listener; import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java index 76feee72f..96abac494 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java @@ -40,11 +40,16 @@ public class ParticipantStatusReqPublisher extends AbstractParticipantPublisher< */ @Timed(value = "publisher.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages published") public void send(UUID participantId) { - ParticipantStatusReq message = new ParticipantStatusReq(); + var message = new ParticipantStatusReq(); message.setParticipantId(participantId); message.setTimestamp(Instant.now()); - LOGGER.debug("Participant StatusReq sent {}", message); + LOGGER.debug("Participant StatusReq sent {}", message.getMessageId()); super.send(message); } + + @Override + public boolean isDefaultTopic() { + return false; + } } diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java index ae7eda1ee..d90b6f667 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java @@ -23,69 +23,58 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm; import io.micrometer.core.annotation.Timed; import java.time.Instant; import java.util.List; -import java.util.Optional; import java.util.UUID; +import lombok.AllArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.main.parameters.Topics; +import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; +import org.onap.policy.clamp.models.acm.concepts.DeployState; import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync; import org.onap.policy.clamp.models.acm.utils.AcmUtils; -import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; - @Component -public class ParticipantSyncPublisher extends ParticipantRestartPublisher { +@AllArgsConstructor +public class ParticipantSyncPublisher extends AbstractParticipantPublisher<ParticipantSync> { private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class); - private final AcRuntimeParameterGroup acRuntimeParameterGroup; - public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) { - super(acRuntimeParameterGroup); - this.acRuntimeParameterGroup = acRuntimeParameterGroup; - } - - /** - * Send sync msg to Participant. + * Send Restart sync msg to Participant by participantId. * - * @param participantId the ParticipantId + * @param participantId the participantId + * @param replicaId the replicaId * @param acmDefinition the AutomationComposition Definition * @param automationCompositions the list of automationCompositions */ - @Override @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") - public void send(UUID participantId, AutomationCompositionDefinition acmDefinition, + public void sendRestartMsg(UUID participantId, UUID replicaId, AutomationCompositionDefinition acmDefinition, List<AutomationComposition> automationCompositions) { var message = new ParticipantSync(); message.setParticipantId(participantId); + message.setReplicaId(replicaId); + message.setRestarting(true); message.setCompositionId(acmDefinition.getCompositionId()); message.setMessageId(UUID.randomUUID()); message.setTimestamp(Instant.now()); message.setState(acmDefinition.getState()); - message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition)); + message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(participantId, acmDefinition, + acRuntimeParameterGroup.getAcmParameters().getToscaElementName())); var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate()); for (var automationComposition : automationCompositions) { - var syncAc = new ParticipantRestartAc(); - syncAc.setAutomationCompositionId(automationComposition.getInstanceId()); - for (var element : automationComposition.getElements().values()) { - if (participantId.equals(element.getParticipantId())) { - var acElementSync = AcmUtils.createAcElementRestart(element); - acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment); - syncAc.getAcElementList().add(acElementSync); - } - } + var syncAc = AcmUtils.createAcRestart(automationComposition, participantId, toscaServiceTemplateFragment); message.getAutomationcompositionList().add(syncAc); } - LOGGER.debug("Participant Sync sent {}", message); + LOGGER.debug("Participant Restarting Sync sent {}", message); super.send(message); } @@ -98,4 +87,66 @@ public class ParticipantSyncPublisher extends ParticipantRestartPublisher { return false; } + /** + * Send AutomationCompositionDefinition sync msg to all Participants. + * + * @param acDefinition the AutomationComposition Definition + * @param excludeReplicaId the replica to be excluded + */ + @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") + public void sendSync(AutomationCompositionDefinition acDefinition, UUID excludeReplicaId) { + var message = new ParticipantSync(); + message.setCompositionId(acDefinition.getCompositionId()); + if (excludeReplicaId != null) { + message.getExcludeReplicas().add(excludeReplicaId); + } + message.setState(acDefinition.getState()); + message.setStateChangeResult(acDefinition.getStateChangeResult()); + message.setMessageId(UUID.randomUUID()); + message.setTimestamp(Instant.now()); + if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) { + message.setDelete(true); + } else { + message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(null, acDefinition, + acRuntimeParameterGroup.getAcmParameters().getToscaElementName())); + } + LOGGER.debug("Participant AutomationCompositionDefinition Sync sent {}", message); + super.send(message); + } + + /** + * Send AutomationComposition sync msg to all Participants. + * + * @param serviceTemplate the ServiceTemplate + * @param automationComposition the automationComposition + */ + @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published") + public void sendSync(ToscaServiceTemplate serviceTemplate, AutomationComposition automationComposition) { + var message = new ParticipantSync(); + message.setCompositionId(automationComposition.getCompositionId()); + message.setAutomationCompositionId(automationComposition.getInstanceId()); + message.setState(AcTypeState.PRIMED); + message.setMessageId(UUID.randomUUID()); + message.setTimestamp(Instant.now()); + var syncAc = new ParticipantRestartAc(); + syncAc.setAutomationCompositionId(automationComposition.getInstanceId()); + syncAc.setDeployState(automationComposition.getDeployState()); + syncAc.setLockState(automationComposition.getLockState()); + syncAc.setStateChangeResult(automationComposition.getStateChangeResult()); + if (DeployState.DELETED.equals(automationComposition.getDeployState())) { + message.setDelete(true); + } else { + var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(serviceTemplate); + for (var element : automationComposition.getElements().values()) { + var acElementSync = AcmUtils.createAcElementRestart(element); + acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment); + syncAc.getAcElementList().add(acElementSync); + + } + } + message.getAutomationcompositionList().add(syncAc); + + LOGGER.debug("Participant AutomationComposition Sync sent {}", message.getMessageId()); + super.send(message); + } } diff --git a/runtime-acm/src/main/resources/application.yaml b/runtime-acm/src/main/resources/application.yaml index 0e2585dba..bca01ace8 100644 --- a/runtime-acm/src/main/resources/application.yaml +++ b/runtime-acm/src/main/resources/application.yaml @@ -44,21 +44,18 @@ runtime: maxStatusWaitMs: 200000 topicParameterGroup: topicSources: - - - topic: ${runtime.topics.operationTopic} + - topic: ${runtime.topics.operationTopic} servers: - ${topicServer:kafka:9092} topicCommInfrastructure: NOOP fetchTimeout: 15000 topicSinks: - - - topic: ${runtime.topics.operationTopic} + - topic: ${runtime.topics.operationTopic} servers: - ${topicServer:kafka:9092} topicCommInfrastructure: NOOP - - - topic: ${runtime.topics.syncTopic} + - topic: ${runtime.topics.syncTopic} servers: - ${topicServer:kafka:9092} topicCommInfrastructure: NOOP |