aboutsummaryrefslogtreecommitdiffstats
path: root/runtime-acm/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'runtime-acm/src/main')
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java10
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/MetricsConfiguration.java8
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/OpenTelConfiguration.java6
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/SecurityConfig.java20
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java11
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java4
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java202
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java4
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java2
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/ParticipantController.java7
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java27
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java131
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java6
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java52
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java160
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java (renamed from runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java)35
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java200
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java7
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java7
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcElementPropertiesPublisher.java2
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcPreparePublisher.java78
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionDeployPublisher.java2
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionMigrationPublisher.java20
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangeAckListener.java2
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdateAckListener.java2
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantDeregisterListener.java2
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimeAckListener.java2
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java16
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterAckPublisher.java9
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterListener.java2
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java117
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusListener.java2
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java9
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java105
-rw-r--r--runtime-acm/src/main/resources/application.yaml9
35 files changed, 734 insertions, 544 deletions
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
index 74ccb9cc6..48b139495 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
@@ -29,7 +29,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.RequiredArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
@@ -39,6 +38,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.commissioning.Commissionin
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.AcTypeStateResolver;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
import org.onap.policy.models.base.PfModelRuntimeException;
import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
@@ -56,7 +56,7 @@ public class CommissioningProvider {
private final AcDefinitionProvider acDefinitionProvider;
private final AutomationCompositionProvider acProvider;
- private final AcmParticipantProvider acmParticipantProvider;
+ private final ParticipantProvider participantProvider;
private final AcTypeStateResolver acTypeStateResolver;
private final ParticipantPrimePublisher participantPrimePublisher;
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
@@ -188,10 +188,6 @@ public class CommissioningProvider {
throw new PfModelRuntimeException(Status.BAD_REQUEST, "There are instances, Priming/Depriming not allowed");
}
var acmDefinition = acDefinitionProvider.getAcDefinition(compositionId);
- if (acmDefinition.getRestarting() != null) {
- throw new PfModelRuntimeException(Status.BAD_REQUEST,
- "There is a restarting process, Priming/Depriming not allowed");
- }
var stateOrdered = acTypeStateResolver.resolve(acTypeStateUpdate.getPrimeOrder(), acmDefinition.getState(),
acmDefinition.getStateChangeResult());
switch (stateOrdered) {
@@ -229,7 +225,7 @@ public class CommissioningProvider {
}
}
if (!participantIds.isEmpty()) {
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
}
acmDefinition.setState(AcTypeState.DEPRIMING);
acmDefinition.setLastMsg(TimestampHelper.now());
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/MetricsConfiguration.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/MetricsConfiguration.java
index 084f7c774..f5c8368e2 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/MetricsConfiguration.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/MetricsConfiguration.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022, 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ package org.onap.policy.clamp.acm.runtime.config;
import io.micrometer.core.aop.TimedAspect;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -34,8 +35,9 @@ public class MetricsConfiguration {
* Load up the metrics registry.
*/
@Bean
- public InitializingBean forcePrometheusPostProcessor(BeanPostProcessor meterRegistryPostProcessor,
- MeterRegistry registry) {
+ public InitializingBean forcePrometheusPostProcessor(@Qualifier("meterRegistryPostProcessor")
+ BeanPostProcessor meterRegistryPostProcessor,
+ MeterRegistry registry) {
return () -> meterRegistryPostProcessor.postProcessAfterInitialization(registry, "");
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/OpenTelConfiguration.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/OpenTelConfiguration.java
index 3727333a4..05d47d4f7 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/OpenTelConfiguration.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/OpenTelConfiguration.java
@@ -35,7 +35,7 @@ import org.springframework.context.annotation.Configuration;
public class OpenTelConfiguration {
@Bean
- @ConditionalOnProperty(prefix = "tracing", name = "enabled", havingValue = "true", matchIfMissing = false)
+ @ConditionalOnProperty(prefix = "tracing", name = "enabled", havingValue = "true")
@ConditionalOnExpression("'http'.equals('${tracing.exporter.protocol}')")
OtlpHttpSpanExporter otlpHttpSpanExporter(@Value("${tracing.exporter.endpoint:http://jaeger:4318/v1/traces}") String url) {
return OtlpHttpSpanExporter.builder()
@@ -44,7 +44,7 @@ public class OpenTelConfiguration {
}
@Bean
- @ConditionalOnProperty(prefix = "tracing", name = "enabled", havingValue = "true", matchIfMissing = false)
+ @ConditionalOnProperty(prefix = "tracing", name = "enabled", havingValue = "true")
@ConditionalOnExpression("'grpc'.equals('${tracing.exporter.protocol}')")
OtlpGrpcSpanExporter otlpGrpcSpanExporter(@Value("${tracing.exporter.endpoint:http://jaeger:4317}") String url) {
return OtlpGrpcSpanExporter.builder()
@@ -53,7 +53,7 @@ public class OpenTelConfiguration {
}
@Bean
- @ConditionalOnProperty(prefix = "tracing", name = "enabled", havingValue = "true", matchIfMissing = false)
+ @ConditionalOnProperty(prefix = "tracing", name = "enabled", havingValue = "true")
JaegerRemoteSampler jaegerRemoteSampler(
@Value("${tracing.sampler.jaeger-remote.endpoint:http://jaeger:14250}") String url,
@Value("${SERVICE_ID:unknown_service}") String serviceId) {
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/SecurityConfig.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/SecurityConfig.java
index 2e75db12e..9d50fc739 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/SecurityConfig.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/SecurityConfig.java
@@ -45,16 +45,16 @@ public class SecurityConfig {
*/
@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
- http
- .httpBasic(Customizer.withDefaults())
- .authorizeHttpRequests(authorize -> {
- if (useBasicAuth) {
- authorize.anyRequest().authenticated();
- } else {
- authorize.anyRequest().permitAll();
- }
- })
- .csrf(AbstractHttpConfigurer::disable);
+ if (useBasicAuth) {
+ http
+ .httpBasic(Customizer.withDefaults())
+ .authorizeHttpRequests(authorize -> authorize.anyRequest().authenticated());
+ } else {
+ http
+ .authorizeHttpRequests(authorize -> authorize.anyRequest().permitAll());
+ }
+
+ http.csrf(AbstractHttpConfigurer::disable);
return http.build();
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java
index a3e55c3f7..6d8e50285 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java
@@ -22,18 +22,17 @@
package org.onap.policy.clamp.acm.runtime.config.messaging;
import java.io.Closeable;
-import java.io.IOException;
import java.util.List;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import lombok.Getter;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
+import org.onap.policy.common.message.bus.event.Topic;
+import org.onap.policy.common.message.bus.event.TopicEndpointManager;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSource;
import org.onap.policy.common.utils.services.ServiceManagerContainer;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
@@ -139,7 +138,7 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
}
@Override
- public void close() throws IOException {
+ public void close() {
if (isAlive()) {
super.shutdown();
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java
index a76a09d99..09408adea 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java
@@ -20,9 +20,7 @@
package org.onap.policy.clamp.acm.runtime.config.messaging;
-import java.util.List;
-import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.message.bus.event.TopicSink;
/**
* Publisher.
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java
index 220636b9d..42af70596 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java
@@ -22,14 +22,13 @@
package org.onap.policy.clamp.acm.runtime.instantiation;
import jakarta.validation.Valid;
-import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
+import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -38,15 +37,21 @@ import org.onap.policy.clamp.models.acm.concepts.DeployState;
import org.onap.policy.clamp.models.acm.concepts.LockState;
import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
+import org.onap.policy.clamp.models.acm.concepts.SubState;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.AcInstanceStateUpdate;
+import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.InstantiationResponse;
+import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
+import org.onap.policy.clamp.models.acm.messages.rest.instantiation.SubOrder;
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
import org.onap.policy.common.parameters.BeanValidationResult;
import org.onap.policy.common.parameters.ObjectValidationResult;
import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.models.base.PfConceptKey;
import org.onap.policy.models.base.PfKey;
import org.onap.policy.models.base.PfModelRuntimeException;
import org.slf4j.Logger;
@@ -62,6 +67,7 @@ import org.springframework.transaction.annotation.Transactional;
@RequiredArgsConstructor
public class AutomationCompositionInstantiationProvider {
private static final String DO_NOT_MATCH = " do not match with ";
+ private static final String ELEMENT_ID_NOT_PRESENT = "Element id not present ";
private static final Logger LOGGER = LoggerFactory.getLogger(AutomationCompositionInstantiationProvider.class);
@@ -69,7 +75,7 @@ public class AutomationCompositionInstantiationProvider {
private final AcDefinitionProvider acDefinitionProvider;
private final AcInstanceStateResolver acInstanceStateResolver;
private final SupervisionAcHandler supervisionAcHandler;
- private final AcmParticipantProvider acmParticipantProvider;
+ private final ParticipantProvider participantProvider;
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
/**
@@ -82,19 +88,19 @@ public class AutomationCompositionInstantiationProvider {
public InstantiationResponse createAutomationComposition(UUID compositionId,
AutomationComposition automationComposition) {
if (!compositionId.equals(automationComposition.getCompositionId())) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
+ throw new PfModelRuntimeException(Status.BAD_REQUEST,
automationComposition.getCompositionId() + DO_NOT_MATCH + compositionId);
}
var checkAutomationCompositionOpt =
automationCompositionProvider.findAutomationComposition(automationComposition.getKey().asIdentifier());
if (checkAutomationCompositionOpt.isPresent()) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
+ throw new PfModelRuntimeException(Status.BAD_REQUEST,
automationComposition.getKey().asIdentifier() + " already defined");
}
var validationResult = validateAutomationComposition(automationComposition);
if (!validationResult.isValid()) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, validationResult.getResult());
+ throw new PfModelRuntimeException(Status.BAD_REQUEST, validationResult.getResult());
}
automationComposition = automationCompositionProvider.createAutomationComposition(automationComposition);
@@ -120,7 +126,7 @@ public class AutomationCompositionInstantiationProvider {
var instanceId = automationComposition.getInstanceId();
var acToUpdate = automationCompositionProvider.getAutomationComposition(instanceId);
if (!compositionId.equals(acToUpdate.getCompositionId())) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
+ throw new PfModelRuntimeException(Status.BAD_REQUEST,
automationComposition.getCompositionId() + DO_NOT_MATCH + compositionId);
}
if (DeployState.UNDEPLOYED.equals(acToUpdate.getDeployState())) {
@@ -131,22 +137,38 @@ public class AutomationCompositionInstantiationProvider {
acToUpdate.setDerivedFrom(automationComposition.getDerivedFrom());
var validationResult = validateAutomationComposition(acToUpdate);
if (!validationResult.isValid()) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, validationResult.getResult());
+ throw new PfModelRuntimeException(Status.BAD_REQUEST, validationResult.getResult());
}
automationComposition = automationCompositionProvider.updateAutomationComposition(acToUpdate);
return createInstantiationResponse(automationComposition);
- } else if ((DeployState.DEPLOYED.equals(acToUpdate.getDeployState())
- || DeployState.UPDATING.equals(acToUpdate.getDeployState()))
- && LockState.LOCKED.equals(acToUpdate.getLockState())) {
- if (automationComposition.getCompositionTargetId() != null) {
- return migrateAutomationComposition(automationComposition, acToUpdate);
+ }
+
+ var deployOrder = DeployOrder.UPDATE;
+ var subOrder = SubOrder.NONE;
+
+ if (automationComposition.getCompositionTargetId() != null) {
+
+ if (Boolean.TRUE.equals(automationComposition.getPrecheck())) {
+ subOrder = SubOrder.MIGRATE_PRECHECK;
+ deployOrder = DeployOrder.NONE;
} else {
- return updateDeployedAutomationComposition(automationComposition, acToUpdate);
+ deployOrder = DeployOrder.MIGRATE;
}
}
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
- "Not allowed to update in the state " + acToUpdate.getDeployState());
+ var result = acInstanceStateResolver.resolve(deployOrder, LockOrder.NONE, subOrder,
+ acToUpdate.getDeployState(), acToUpdate.getLockState(), acToUpdate.getSubState(),
+ acToUpdate.getStateChangeResult());
+ return switch (result) {
+ case "UPDATE" -> updateDeployedAutomationComposition(automationComposition, acToUpdate);
+
+ case "MIGRATE" -> migrateAutomationComposition(automationComposition, acToUpdate);
+
+ case "MIGRATE_PRECHECK" -> migratePrecheckAc(automationComposition, acToUpdate);
+
+ default -> throw new PfModelRuntimeException(Status.BAD_REQUEST,
+ "Not allowed to " + deployOrder + " in the state " + acToUpdate.getDeployState());
+ };
}
/**
@@ -164,17 +186,14 @@ public class AutomationCompositionInstantiationProvider {
var elementId = element.getKey();
var dbAcElement = acToBeUpdated.getElements().get(elementId);
if (dbAcElement == null) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, "Element id not present " + elementId);
+ throw new PfModelRuntimeException(Status.BAD_REQUEST, ELEMENT_ID_NOT_PRESENT + elementId);
}
AcmUtils.recursiveMerge(dbAcElement.getProperties(), element.getValue().getProperties());
}
- if (automationComposition.getRestarting() != null) {
- throw new PfModelRuntimeException(Status.BAD_REQUEST, "There is a restarting process, Update not allowed");
- }
var validationResult = validateAutomationComposition(acToBeUpdated);
if (!validationResult.isValid()) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, validationResult.getResult());
+ throw new PfModelRuntimeException(Status.BAD_REQUEST, validationResult.getResult());
}
// Publish property update event to the participants
supervisionAcHandler.update(acToBeUpdated);
@@ -187,47 +206,104 @@ public class AutomationCompositionInstantiationProvider {
AutomationComposition automationComposition, AutomationComposition acToBeUpdated) {
if (!DeployState.DEPLOYED.equals(acToBeUpdated.getDeployState())) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
+ throw new PfModelRuntimeException(Status.BAD_REQUEST,
"Not allowed to migrate in the state " + acToBeUpdated.getDeployState());
}
- if (automationComposition.getRestarting() != null) {
- throw new PfModelRuntimeException(Status.BAD_REQUEST, "There is a restarting process, Migrate not allowed");
- }
// Iterate and update the element property values
for (var element : automationComposition.getElements().entrySet()) {
var elementId = element.getKey();
var dbAcElement = acToBeUpdated.getElements().get(elementId);
+ // Add additional elements if present for migration
if (dbAcElement == null) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, "Element id not present " + elementId);
- }
- AcmUtils.recursiveMerge(dbAcElement.getProperties(), element.getValue().getProperties());
- var newDefinition = element.getValue().getDefinition();
- var compatibility =
- newDefinition.asConceptKey().getCompatibility(dbAcElement.getDefinition().asConceptKey());
- if (PfKey.Compatibility.DIFFERENT.equals(compatibility)) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
- dbAcElement.getDefinition() + " is not compatible with " + newDefinition);
- }
- if (PfKey.Compatibility.MAJOR.equals(compatibility) || PfKey.Compatibility.MINOR.equals(compatibility)) {
- LOGGER.warn("Migrate {}: Version {} has {} compatibility with {} ",
- automationComposition.getInstanceId(), newDefinition, compatibility, dbAcElement.getDefinition());
+ LOGGER.info("New Ac element {} added in Migration", elementId);
+ acToBeUpdated.getElements().put(elementId, element.getValue());
+ } else {
+ AcmUtils.recursiveMerge(dbAcElement.getProperties(), element.getValue().getProperties());
+ var newDefinition = element.getValue().getDefinition().asConceptKey();
+ var dbElementDefinition = dbAcElement.getDefinition().asConceptKey();
+ checkCompatibility(newDefinition, dbElementDefinition, automationComposition.getInstanceId());
+ dbAcElement.setDefinition(element.getValue().getDefinition());
}
- dbAcElement.setDefinition(element.getValue().getDefinition());
}
+ // Remove element which is not present in the new Ac instance
+ var elementsRemoved = getElementRemoved(acToBeUpdated, automationComposition);
+ elementsRemoved.forEach(uuid -> acToBeUpdated.getElements().remove(uuid));
var validationResult =
- validateAutomationComposition(acToBeUpdated, automationComposition.getCompositionTargetId());
+ validateAutomationComposition(acToBeUpdated, automationComposition.getCompositionTargetId());
if (!validationResult.isValid()) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, validationResult.getResult());
+ throw new PfModelRuntimeException(Status.BAD_REQUEST, validationResult.getResult());
}
acToBeUpdated.setCompositionTargetId(automationComposition.getCompositionTargetId());
+ var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionTargetId());
+ // Publish migrate event to the participants
+ supervisionAcHandler.migrate(acToBeUpdated, acDefinition.getServiceTemplate());
+
+ var ac = automationCompositionProvider.updateAutomationComposition(acToBeUpdated);
+ elementsRemoved.forEach(automationCompositionProvider::deleteAutomationCompositionElement);
+ return createInstantiationResponse(ac);
+ }
+
+ private List<UUID> getElementRemoved(AutomationComposition acFromDb, AutomationComposition acFromMigration) {
+ return acFromDb.getElements().keySet().stream()
+ .filter(id -> acFromMigration.getElements().get(id) == null).toList();
+ }
+
+ void checkCompatibility(PfConceptKey newDefinition, PfConceptKey dbElementDefinition,
+ UUID instanceId) {
+ var compatibility = newDefinition.getCompatibility(dbElementDefinition);
+ if (PfKey.Compatibility.DIFFERENT.equals(compatibility)) {
+ throw new PfModelRuntimeException(Status.BAD_REQUEST,
+ dbElementDefinition + " is not compatible with " + newDefinition);
+ }
+ if (PfKey.Compatibility.MAJOR.equals(compatibility) || PfKey.Compatibility.MINOR
+ .equals(compatibility)) {
+ LOGGER.warn("Migrate {}: Version {} has {} compatibility with {} ", instanceId, newDefinition,
+ compatibility, dbElementDefinition);
+ }
+ }
+
+ private InstantiationResponse migratePrecheckAc(
+ AutomationComposition automationComposition, AutomationComposition acToBeUpdated) {
+
+ acToBeUpdated.setPrecheck(true);
+ var copyAc = new AutomationComposition(acToBeUpdated);
+ // Iterate and update the element property values
+ for (var element : automationComposition.getElements().entrySet()) {
+ var elementId = element.getKey();
+ var copyElement = copyAc.getElements().get(elementId);
+ // Add additional elements if present for migration
+ if (copyElement == null) {
+ LOGGER.info("New Ac element {} added in Migration", elementId);
+ copyAc.getElements().put(elementId, element.getValue());
+ } else {
+ AcmUtils.recursiveMerge(copyElement.getProperties(), element.getValue().getProperties());
+ var newDefinition = element.getValue().getDefinition().asConceptKey();
+ var copyElementDefinition = copyElement.getDefinition().asConceptKey();
+ checkCompatibility(newDefinition, copyElementDefinition, automationComposition.getInstanceId());
+ copyElement.setDefinition(element.getValue().getDefinition());
+ }
+ }
+ // Remove element which is not present in the new Ac instance
+ var elementsRemoved = getElementRemoved(copyAc, automationComposition);
+ elementsRemoved.forEach(uuid -> copyAc.getElements().remove(uuid));
+
+ var validationResult =
+ validateAutomationComposition(copyAc, automationComposition.getCompositionTargetId());
+ if (!validationResult.isValid()) {
+ throw new PfModelRuntimeException(Status.BAD_REQUEST, validationResult.getResult());
+ }
+ copyAc.setCompositionTargetId(automationComposition.getCompositionTargetId());
// Publish migrate event to the participants
- supervisionAcHandler.migrate(acToBeUpdated, automationComposition.getCompositionTargetId());
+ supervisionAcHandler.migratePrecheck(copyAc);
- automationComposition = automationCompositionProvider.updateAutomationComposition(acToBeUpdated);
- return createInstantiationResponse(automationComposition);
+ AcmUtils.setCascadedState(acToBeUpdated, DeployState.DEPLOYED, LockState.LOCKED,
+ SubState.MIGRATION_PRECHECKING);
+ acToBeUpdated.setStateChangeResult(StateChangeResult.NO_ERROR);
+
+ return createInstantiationResponse(automationCompositionProvider.updateAutomationComposition(acToBeUpdated));
}
private BeanValidationResult validateAutomationComposition(AutomationComposition automationComposition) {
@@ -256,16 +332,10 @@ public class AutomationCompositionInstantiationProvider {
ValidationStatus.INVALID, "Commissioned automation composition definition not primed"));
return result;
}
- if (acDefinitionOpt.get().getRestarting() != null) {
- result.addResult(
- new ObjectValidationResult("ServiceTemplate.restarting", acDefinitionOpt.get().getRestarting(),
- ValidationStatus.INVALID, "There is a restarting process in composition"));
- return result;
- }
var participantIds = acDefinitionOpt.get().getElementStateMap().values().stream()
.map(NodeTemplateState::getParticipantId).collect(Collectors.toSet());
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
result.addResult(AcmUtils.validateAutomationComposition(automationComposition,
acDefinitionOpt.get().getServiceTemplate(),
@@ -296,7 +366,7 @@ public class AutomationCompositionInstantiationProvider {
var automationComposition = automationCompositionProvider.getAutomationComposition(instanceId);
if (!compositionId.equals(automationComposition.getCompositionId())
&& !compositionId.equals(automationComposition.getCompositionTargetId())) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
+ throw new PfModelRuntimeException(Status.BAD_REQUEST,
automationComposition.getCompositionId() + DO_NOT_MATCH + compositionId);
}
return automationComposition;
@@ -312,26 +382,23 @@ public class AutomationCompositionInstantiationProvider {
public InstantiationResponse deleteAutomationComposition(UUID compositionId, UUID instanceId) {
var automationComposition = automationCompositionProvider.getAutomationComposition(instanceId);
if (!compositionId.equals(automationComposition.getCompositionId())) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
+ throw new PfModelRuntimeException(Status.BAD_REQUEST,
automationComposition.getCompositionId() + DO_NOT_MATCH + compositionId);
}
if (!DeployState.UNDEPLOYED.equals(automationComposition.getDeployState())
&& !DeployState.DELETING.equals(automationComposition.getDeployState())) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
+ throw new PfModelRuntimeException(Status.BAD_REQUEST,
"Automation composition state is still " + automationComposition.getDeployState());
}
if (DeployState.DELETING.equals(automationComposition.getDeployState())
&& StateChangeResult.NO_ERROR.equals(automationComposition.getStateChangeResult())) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
+ throw new PfModelRuntimeException(Status.BAD_REQUEST,
"Automation composition state is still " + automationComposition.getDeployState());
}
- if (automationComposition.getRestarting() != null) {
- throw new PfModelRuntimeException(Status.BAD_REQUEST, "There is a restarting process, Delete not allowed");
- }
var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId());
var participantIds = acDefinition.getElementStateMap().values().stream()
.map(NodeTemplateState::getParticipantId).collect(Collectors.toSet());
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
supervisionAcHandler.delete(automationComposition, acDefinition);
var response = new InstantiationResponse();
response.setInstanceId(automationComposition.getInstanceId());
@@ -366,7 +433,7 @@ public class AutomationCompositionInstantiationProvider {
@Valid AcInstanceStateUpdate acInstanceStateUpdate) {
var automationComposition = automationCompositionProvider.getAutomationComposition(instanceId);
if (!compositionId.equals(automationComposition.getCompositionId())) {
- throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
+ throw new PfModelRuntimeException(Status.BAD_REQUEST,
automationComposition.getCompositionId() + DO_NOT_MATCH + compositionId);
}
var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId());
@@ -374,10 +441,11 @@ public class AutomationCompositionInstantiationProvider {
var participantIds = acDefinition.getElementStateMap().values().stream()
.map(NodeTemplateState::getParticipantId).collect(Collectors.toSet());
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
var result = acInstanceStateResolver.resolve(acInstanceStateUpdate.getDeployOrder(),
- acInstanceStateUpdate.getLockOrder(), automationComposition.getDeployState(),
- automationComposition.getLockState(), automationComposition.getStateChangeResult());
+ acInstanceStateUpdate.getLockOrder(), acInstanceStateUpdate.getSubOrder(),
+ automationComposition.getDeployState(), automationComposition.getLockState(),
+ automationComposition.getSubState(), automationComposition.getStateChangeResult());
switch (result) {
case "DEPLOY":
supervisionAcHandler.deploy(automationComposition, acDefinition);
@@ -395,6 +463,14 @@ public class AutomationCompositionInstantiationProvider {
supervisionAcHandler.unlock(automationComposition, acDefinition);
break;
+ case "PREPARE":
+ supervisionAcHandler.prepare(automationComposition);
+ break;
+
+ case "REVIEW":
+ supervisionAcHandler.review(automationComposition);
+ break;
+
default:
throw new PfModelRuntimeException(Status.BAD_REQUEST, "Not valid " + acInstanceStateUpdate);
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java
index a0b6fe13e..67befe394 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java
@@ -24,7 +24,7 @@ import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.Setter;
-import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
+import org.onap.policy.common.parameters.topic.TopicParameterGroup;
import org.onap.policy.common.parameters.validation.ParameterGroupConstraint;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
@@ -53,5 +53,5 @@ public class AcRuntimeParameterGroup {
@Valid
@NotNull
- private Topics topics;
+ private Topics topics = new Topics();
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java
index d485a24ba..6e230d3df 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java
@@ -20,6 +20,7 @@ package org.onap.policy.clamp.acm.runtime.main.parameters;
import lombok.AllArgsConstructor;
import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.validation.annotation.Validated;
@@ -27,6 +28,7 @@ import org.springframework.validation.annotation.Validated;
@Setter
@Validated
@AllArgsConstructor
+@NoArgsConstructor
public class Topics {
private String operationTopic;
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/ParticipantController.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/ParticipantController.java
index 855681e69..1136dcb16 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/ParticipantController.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/rest/ParticipantController.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation.
+ * Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -41,8 +41,7 @@ public class ParticipantController extends AbstractRestController implements Par
@Override
public ResponseEntity<ParticipantInformation> getParticipant(UUID participantId, UUID requestId) {
- ParticipantInformation participantInformation = acmParticipantProvider
- .getParticipantById(participantId);
+ var participantInformation = acmParticipantProvider.getParticipantById(participantId);
return ResponseEntity.ok().body(participantInformation);
}
@@ -61,7 +60,7 @@ public class ParticipantController extends AbstractRestController implements Par
@Override
public ResponseEntity<List<ParticipantInformation>> queryParticipants(String name, String version,
UUID requestId) {
- List<ParticipantInformation> participantInformationList = acmParticipantProvider.getAllParticipants();
+ var participantInformationList = acmParticipantProvider.getAllParticipants();
return ResponseEntity.ok().body(participantInformationList);
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java
index 13382e0fb..62ba7b017 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation.
+ * Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,12 +20,10 @@
package org.onap.policy.clamp.acm.runtime.participants;
-import jakarta.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.apache.commons.collections4.MapUtils;
@@ -33,9 +31,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantStatusReqPu
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantInformation;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
-import org.onap.policy.models.base.PfModelRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -94,12 +90,11 @@ public class AcmParticipantProvider {
* @param participantId The UUID of the participant to send request to
*/
public void sendParticipantStatusRequest(UUID participantId) {
- var participant = this.participantProvider.getParticipantById(participantId);
+ // check if participant is present
+ this.participantProvider.getParticipantById(participantId);
LOGGER.debug("Requesting Participant Status Now ParticipantStatusReq");
participantStatusReqPublisher.send(participantId);
- participant.setParticipantState(ParticipantState.OFF_LINE);
- participantProvider.updateParticipant(participant);
}
/**
@@ -110,22 +105,6 @@ public class AcmParticipantProvider {
this.participantStatusReqPublisher.send((UUID) null);
}
- /**
- * Verify Participant state.
- *
- * @param participantIds The list of UUIDs of the participants to get
- * @throws PfModelRuntimeException in case the participant is offline
- */
- public void verifyParticipantState(Set<UUID> participantIds) {
- for (UUID participantId : participantIds) {
- var participant = this.participantProvider.getParticipantById(participantId);
- if (! participant.getParticipantState().equals(ParticipantState.ON_LINE)) {
- throw new PfModelRuntimeException(Response.Status.CONFLICT,
- "Participant: " + participantId + " is OFFLINE");
- }
- }
- }
-
private Map<UUID, AutomationCompositionElement> getAutomationCompositionElementsForParticipant(UUID participantId) {
var automationCompositionElements = participantProvider
.getAutomationCompositionElements(participantId);
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java
index 802c6603b..4f564478f 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java
@@ -23,27 +23,30 @@ package org.onap.policy.clamp.acm.runtime.supervision;
import io.micrometer.core.annotation.Timed;
import io.opentelemetry.context.Context;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.AllArgsConstructor;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AcElementPropertiesPublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.AcPreparePublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
-import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
import org.onap.policy.clamp.models.acm.concepts.DeployState;
import org.onap.policy.clamp.models.acm.concepts.LockState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
+import org.onap.policy.clamp.models.acm.concepts.SubState;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
+import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -58,12 +61,15 @@ public class SupervisionAcHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAcHandler.class);
private final AutomationCompositionProvider automationCompositionProvider;
+ private final AcDefinitionProvider acDefinitionProvider;
// Publishers for participant communication
private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher;
private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher;
private final AcElementPropertiesPublisher acElementPropertiesPublisher;
private final AutomationCompositionMigrationPublisher acCompositionMigrationPublisher;
+ private final ParticipantSyncPublisher participantSyncPublisher;
+ private final AcPreparePublisher acPreparePublisher;
private final ExecutorService executor = Context.taskWrapping(Executors.newFixedThreadPool(1));
@@ -146,6 +152,30 @@ public class SupervisionAcHandler {
}
/**
+ * Handle prepare Pre Deploy an AutomationComposition instance.
+ *
+ * @param automationComposition the AutomationComposition
+ */
+ public void prepare(AutomationComposition automationComposition) {
+ AcmUtils.setCascadedState(automationComposition, DeployState.UNDEPLOYED, LockState.NONE, SubState.PREPARING);
+ automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR);
+ automationCompositionProvider.updateAutomationComposition(automationComposition);
+ executor.execute(() -> acPreparePublisher.sendPrepare(automationComposition));
+ }
+
+ /**
+ * Handle prepare Post Deploy an AutomationComposition instance.
+ *
+ * @param automationComposition the AutomationComposition
+ */
+ public void review(AutomationComposition automationComposition) {
+ AcmUtils.setCascadedState(automationComposition, DeployState.DEPLOYED, LockState.LOCKED, SubState.REVIEWING);
+ automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR);
+ automationCompositionProvider.updateAutomationComposition(automationComposition);
+ executor.execute(() -> acPreparePublisher.sendRevew(automationComposition));
+ }
+
+ /**
* Handle Lock an AutomationComposition instance.
*
* @param automationComposition the AutomationComposition
@@ -227,10 +257,14 @@ public class SupervisionAcHandler {
}
private void setAcElementStateInDb(AutomationCompositionDeployAck automationCompositionAckMessage) {
+ if (!validateMessage(automationCompositionAckMessage)) {
+ return;
+ }
+
var automationCompositionOpt = automationCompositionProvider
.findAutomationComposition(automationCompositionAckMessage.getAutomationCompositionId());
if (automationCompositionOpt.isEmpty()) {
- LOGGER.warn("AutomationComposition not found in database {}",
+ LOGGER.error("AutomationComposition not found in database {}",
automationCompositionAckMessage.getAutomationCompositionId());
return;
}
@@ -239,14 +273,7 @@ public class SupervisionAcHandler {
if (automationCompositionAckMessage.getAutomationCompositionResultMap() == null
|| automationCompositionAckMessage.getAutomationCompositionResultMap().isEmpty()) {
if (DeployState.DELETING.equals(automationComposition.getDeployState())) {
- // scenario when Automation Composition instance has never been deployed
- for (var element : automationComposition.getElements().values()) {
- if (element.getParticipantId().equals(automationCompositionAckMessage.getParticipantId())) {
- element.setDeployState(DeployState.DELETED);
- automationCompositionProvider.updateAutomationCompositionElement(element,
- automationComposition.getInstanceId());
- }
- }
+ deleteAcInstance(automationComposition, automationCompositionAckMessage.getParticipantId());
} else {
LOGGER.warn("Empty AutomationCompositionResultMap {} {}",
automationCompositionAckMessage.getAutomationCompositionId(),
@@ -257,15 +284,51 @@ public class SupervisionAcHandler {
var updated = updateState(automationComposition,
automationCompositionAckMessage.getAutomationCompositionResultMap().entrySet(),
- automationCompositionAckMessage.getStateChangeResult());
+ automationCompositionAckMessage.getStateChangeResult(), automationCompositionAckMessage.getStage());
if (updated) {
- automationCompositionProvider.updateAutomationComposition(automationComposition);
+ automationComposition = automationCompositionProvider.updateAcState(automationComposition);
+ var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId());
+ participantSyncPublisher.sendSync(acDefinition.getServiceTemplate(), automationComposition);
+ }
+ }
+
+ private boolean validateMessage(AutomationCompositionDeployAck acAckMessage) {
+ if (acAckMessage.getAutomationCompositionId() == null
+ || acAckMessage.getStateChangeResult() == null) {
+ LOGGER.error("Not valid AutomationCompositionDeployAck message");
+ return false;
+ }
+ if (!StateChangeResult.NO_ERROR.equals(acAckMessage.getStateChangeResult())
+ && !StateChangeResult.FAILED.equals(acAckMessage.getStateChangeResult())) {
+ LOGGER.error("Not valid AutomationCompositionDeployAck message, stateChangeResult is not valid {} ",
+ acAckMessage.getStateChangeResult());
+ return false;
+ }
+
+ if (acAckMessage.getStage() == null) {
+ for (var el : acAckMessage.getAutomationCompositionResultMap().values()) {
+ if (AcmUtils.isInTransitionalState(el.getDeployState(), el.getLockState(), SubState.NONE)) {
+ LOGGER.error("Not valid AutomationCompositionDeployAck message, states are not valid");
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private void deleteAcInstance(AutomationComposition automationComposition, UUID participantId) {
+ // scenario when Automation Composition instance has never been deployed
+ for (var element : automationComposition.getElements().values()) {
+ if (element.getParticipantId().equals(participantId)) {
+ element.setDeployState(DeployState.DELETED);
+ automationCompositionProvider.updateAutomationCompositionElement(element);
+ }
}
}
private boolean updateState(AutomationComposition automationComposition,
Set<Map.Entry<UUID, AcElementDeployAck>> automationCompositionResultSet,
- StateChangeResult stateChangeResult) {
+ StateChangeResult stateChangeResult, Integer stage) {
var updated = false;
boolean inProgress = !StateChangeResult.FAILED.equals(automationComposition.getStateChangeResult());
if (inProgress && !stateChangeResult.equals(automationComposition.getStateChangeResult())) {
@@ -276,24 +339,14 @@ public class SupervisionAcHandler {
for (var acElementAck : automationCompositionResultSet) {
var element = automationComposition.getElements().get(acElementAck.getKey());
if (element != null) {
- element.setMessage(acElementAck.getValue().getMessage());
- element.setOutProperties(acElementAck.getValue().getOutProperties());
- element.setOperationalState(acElementAck.getValue().getOperationalState());
- element.setUseState(acElementAck.getValue().getUseState());
+ element.setMessage(AcmUtils.validatedMessage(acElementAck.getValue().getMessage()));
+ if (stage == null) {
+ element.setSubState(SubState.NONE);
+ }
element.setDeployState(acElementAck.getValue().getDeployState());
element.setLockState(acElementAck.getValue().getLockState());
- element.setRestarting(null);
- automationCompositionProvider.updateAutomationCompositionElement(element,
- automationComposition.getInstanceId());
- }
- }
-
- if (automationComposition.getRestarting() != null) {
- var restarting = automationComposition.getElements().values().stream()
- .map(AutomationCompositionElement::getRestarting).filter(Objects::nonNull).findAny();
- if (restarting.isEmpty()) {
- automationComposition.setRestarting(null);
- updated = true;
+ element.setStage(stage);
+ automationCompositionProvider.updateAutomationCompositionElement(element);
}
}
@@ -304,12 +357,22 @@ public class SupervisionAcHandler {
* Handle Migration of an AutomationComposition instance to other ACM Definition.
*
* @param automationComposition the AutomationComposition
- * @param compositionTargetId the ACM Definition Id
+ * @param serviceTemplate the ServiceTemplate
*/
- public void migrate(AutomationComposition automationComposition, UUID compositionTargetId) {
+ public void migrate(AutomationComposition automationComposition, ToscaServiceTemplate serviceTemplate) {
AcmUtils.setCascadedState(automationComposition, DeployState.MIGRATING, LockState.LOCKED);
+ var stage = ParticipantUtils.getFirstStage(automationComposition, serviceTemplate);
automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR);
- executor.execute(
- () -> acCompositionMigrationPublisher.send(automationComposition, compositionTargetId));
+ automationComposition.setPhase(stage);
+ executor.execute(() -> acCompositionMigrationPublisher.send(automationComposition, stage));
+ }
+
+ /**
+ * Handle Migration precheck of an AutomationComposition instance to other ACM Definition.
+ *
+ * @param automationComposition the AutomationComposition
+ */
+ public void migratePrecheck(AutomationComposition automationComposition) {
+ executor.execute(() -> acCompositionMigrationPublisher.send(automationComposition, 0));
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java
index 8f3a4c2eb..9ef979f8e 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java
@@ -41,9 +41,9 @@ public class SupervisionAspect implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAspect.class);
private final SupervisionScanner supervisionScanner;
- private final SupervisionPartecipantScanner partecipantScanner;
+ private final SupervisionParticipantScanner participantScanner;
- private ThreadPoolExecutor executor =
+ private final ThreadPoolExecutor executor =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
@Scheduled(
@@ -56,7 +56,7 @@ public class SupervisionAspect implements Closeable {
private void executeScan() {
supervisionScanner.run();
- partecipantScanner.run();
+ participantScanner.run();
}
/**
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
index 963e4830e..f13f5da2c 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
@@ -23,12 +23,14 @@ package org.onap.policy.clamp.acm.runtime.supervision;
import io.micrometer.core.annotation.Timed;
import lombok.AllArgsConstructor;
-import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
+import org.onap.policy.clamp.models.acm.utils.AcmUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -43,7 +45,7 @@ public class SupervisionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class);
private final AcDefinitionProvider acDefinitionProvider;
- private final AcRuntimeParameterGroup acRuntimeParameterGroup;
+ private final ParticipantSyncPublisher participantSyncPublisher;
/**
* Handle a ParticipantPrimeAck message from a participant.
@@ -52,6 +54,22 @@ public class SupervisionHandler {
*/
@Timed(value = "listener.participant_prime_ack", description = "PARTICIPANT_PRIME_ACK messages received")
public void handleParticipantMessage(ParticipantPrimeAck participantPrimeAckMessage) {
+ if (participantPrimeAckMessage.getCompositionId() == null
+ || participantPrimeAckMessage.getCompositionState() == null
+ || participantPrimeAckMessage.getStateChangeResult() == null) {
+ LOGGER.error("Not valid ParticipantPrimeAck message");
+ return;
+ }
+ if (AcTypeState.PRIMING.equals(participantPrimeAckMessage.getCompositionState())
+ || AcTypeState.DEPRIMING.equals(participantPrimeAckMessage.getCompositionState())) {
+ LOGGER.error("Not valid state {}", participantPrimeAckMessage.getCompositionState());
+ return;
+ }
+ if (!StateChangeResult.NO_ERROR.equals(participantPrimeAckMessage.getStateChangeResult())
+ && !StateChangeResult.FAILED.equals(participantPrimeAckMessage.getStateChangeResult())) {
+ LOGGER.error("Vot valid stateChangeResult {} ", participantPrimeAckMessage.getStateChangeResult());
+ return;
+ }
var acDefinitionOpt = acDefinitionProvider.findAcDefinition(participantPrimeAckMessage.getCompositionId());
if (acDefinitionOpt.isEmpty()) {
LOGGER.warn("AC Definition not found in database {}", participantPrimeAckMessage.getCompositionId());
@@ -59,7 +77,7 @@ public class SupervisionHandler {
}
var acDefinition = acDefinitionOpt.get();
if (!AcTypeState.PRIMING.equals(acDefinition.getState())
- && !AcTypeState.DEPRIMING.equals(acDefinition.getState()) && acDefinition.getRestarting() == null) {
+ && !AcTypeState.DEPRIMING.equals(acDefinition.getState())) {
LOGGER.error("AC Definition {} already primed/deprimed with participant {}",
participantPrimeAckMessage.getCompositionId(), participantPrimeAckMessage.getParticipantId());
return;
@@ -80,20 +98,11 @@ public class SupervisionHandler {
}
boolean completed = true;
- boolean restarting = false;
for (var element : acDefinition.getElementStateMap().values()) {
- if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) {
- element.setMessage(participantPrimeAckMessage.getMessage());
- element.setState(participantPrimeAckMessage.getCompositionState());
- element.setRestarting(null);
- acDefinitionProvider.updateAcDefinitionElement(element, acDefinition.getCompositionId());
- }
+ handlePrimeAckElement(participantPrimeAckMessage, element);
if (!finalState.equals(element.getState())) {
completed = false;
}
- if (element.getRestarting() != null) {
- restarting = true;
- }
}
if (inProgress && !msgInErrors && completed) {
@@ -103,13 +112,20 @@ public class SupervisionHandler {
acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR);
}
}
- if (!restarting && acDefinition.getRestarting() != null) {
- toUpdate = true;
- acDefinition.setRestarting(null);
- }
if (toUpdate) {
acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(),
- acDefinition.getStateChangeResult(), acDefinition.getRestarting());
+ acDefinition.getStateChangeResult());
+ if (!participantPrimeAckMessage.getParticipantId().equals(participantPrimeAckMessage.getReplicaId())) {
+ participantSyncPublisher.sendSync(acDefinition, participantPrimeAckMessage.getReplicaId());
+ }
+ }
+ }
+
+ private void handlePrimeAckElement(ParticipantPrimeAck participantPrimeAckMessage, NodeTemplateState element) {
+ if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) {
+ element.setMessage(AcmUtils.validatedMessage(participantPrimeAckMessage.getMessage()));
+ element.setState(participantPrimeAckMessage.getCompositionState());
+ acDefinitionProvider.updateAcDefinitionElement(element, participantPrimeAckMessage.getCompositionId());
}
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java
index d1efb6ac0..3eb471609 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java
@@ -21,7 +21,6 @@
package org.onap.policy.clamp.acm.runtime.supervision;
import io.micrometer.core.annotation.Timed;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,15 +30,16 @@ import org.apache.commons.collections4.MapUtils;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
-import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionInfo;
import org.onap.policy.clamp.models.acm.concepts.Participant;
import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
-import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
@@ -64,7 +64,7 @@ public class SupervisionParticipantHandler {
private final ParticipantDeregisterAckPublisher participantDeregisterAckPublisher;
private final AutomationCompositionProvider automationCompositionProvider;
private final AcDefinitionProvider acDefinitionProvider;
- private final ParticipantRestartPublisher participantRestartPublisher;
+ private final ParticipantSyncPublisher participantSyncPublisher;
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
/**
@@ -72,24 +72,14 @@ public class SupervisionParticipantHandler {
*
* @param participantRegisterMsg the ParticipantRegister message received from a participant
*/
- @MessageIntercept
@Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received")
public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) {
- var participantOpt = participantProvider.findParticipant(participantRegisterMsg.getParticipantId());
-
- if (participantOpt.isPresent()) {
- var participant = participantOpt.get();
- checkOnline(participant);
- handleRestart(participant.getParticipantId());
- } else {
- var participant = createParticipant(participantRegisterMsg.getParticipantId(),
- listToMap(participantRegisterMsg.getParticipantSupportedElementType()));
- participantProvider.saveParticipant(participant);
-
- }
+ saveIfNotPresent(participantRegisterMsg.getReplicaId(),
+ participantRegisterMsg.getParticipantId(),
+ participantRegisterMsg.getParticipantSupportedElementType(), true);
participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(),
- participantRegisterMsg.getParticipantId());
+ participantRegisterMsg.getParticipantId(), participantRegisterMsg.getReplicaId());
}
/**
@@ -97,15 +87,13 @@ public class SupervisionParticipantHandler {
*
* @param participantDeregisterMsg the ParticipantDeregister message received from a participant
*/
- @MessageIntercept
@Timed(value = "listener.participant_deregister", description = "PARTICIPANT_DEREGISTER messages received")
public void handleParticipantMessage(ParticipantDeregister participantDeregisterMsg) {
- var participantOpt = participantProvider.findParticipant(participantDeregisterMsg.getParticipantId());
-
- if (participantOpt.isPresent()) {
- var participant = participantOpt.get();
- participant.setParticipantState(ParticipantState.OFF_LINE);
- participantProvider.updateParticipant(participant);
+ var replicaId = participantDeregisterMsg.getReplicaId() != null
+ ? participantDeregisterMsg.getReplicaId() : participantDeregisterMsg.getParticipantId();
+ var replicaOpt = participantProvider.findParticipantReplica(replicaId);
+ if (replicaOpt.isPresent()) {
+ participantProvider.deleteParticipantReplica(replicaId);
}
participantDeregisterAckPublisher.send(participantDeregisterMsg.getMessageId());
@@ -116,32 +104,68 @@ public class SupervisionParticipantHandler {
*
* @param participantStatusMsg the ParticipantStatus message received from a participant
*/
- @MessageIntercept
@Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received")
public void handleParticipantMessage(ParticipantStatus participantStatusMsg) {
+ saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(),
+ participantStatusMsg.getParticipantSupportedElementType(), false);
- var participantOpt = participantProvider.findParticipant(participantStatusMsg.getParticipantId());
- if (participantOpt.isEmpty()) {
- var participant = createParticipant(participantStatusMsg.getParticipantId(),
- listToMap(participantStatusMsg.getParticipantSupportedElementType()));
- participantProvider.saveParticipant(participant);
- } else {
- checkOnline(participantOpt.get());
- }
if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) {
- automationCompositionProvider.upgradeStates(participantStatusMsg.getAutomationCompositionInfoList());
+ updateAcOutProperties(participantStatusMsg.getAutomationCompositionInfoList(),
+ participantStatusMsg.getCompositionId());
}
if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty()
&& participantStatusMsg.getCompositionId() != null) {
updateAcDefinitionOutProperties(participantStatusMsg.getCompositionId(),
- participantStatusMsg.getParticipantDefinitionUpdates());
+ participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantDefinitionUpdates());
}
}
- private void updateAcDefinitionOutProperties(UUID composotionId, List<ParticipantDefinition> list) {
- var acDefinitionOpt = acDefinitionProvider.findAcDefinition(composotionId);
+ private void saveIfNotPresent(UUID msgReplicaId, UUID participantId,
+ List<ParticipantSupportedElementType> participantSupportedElementType, boolean registration) {
+ var replicaId = msgReplicaId != null ? msgReplicaId : participantId;
+ var replicaOpt = participantProvider.findParticipantReplica(replicaId);
+ if (replicaOpt.isPresent()) {
+ var replica = replicaOpt.get();
+ checkOnline(replica);
+ } else {
+ var participant = getParticipant(participantId, listToMap(participantSupportedElementType));
+ participant.getReplicas().put(replicaId, createReplica(replicaId));
+ participantProvider.saveParticipant(participant);
+ }
+ if (registration) {
+ handleRestart(participantId, replicaId);
+ }
+ }
+
+ private Participant getParticipant(UUID participantId,
+ Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
+ var participantOpt = participantProvider.findParticipant(participantId);
+ return participantOpt.orElseGet(() -> createParticipant(participantId, participantSupportedElementType));
+ }
+
+ private ParticipantReplica createReplica(UUID replicaId) {
+ var replica = new ParticipantReplica();
+ replica.setReplicaId(replicaId);
+ replica.setParticipantState(ParticipantState.ON_LINE);
+ replica.setLastMsg(TimestampHelper.now());
+ return replica;
+
+ }
+
+ private void updateAcOutProperties(List<AutomationCompositionInfo> automationCompositionInfoList,
+ UUID compositionId) {
+ automationCompositionProvider.upgradeStates(automationCompositionInfoList);
+ var acDefinition = acDefinitionProvider.getAcDefinition(compositionId);
+ for (var acInfo : automationCompositionInfoList) {
+ var ac = automationCompositionProvider.getAutomationComposition(acInfo.getAutomationCompositionId());
+ participantSyncPublisher.sendSync(acDefinition.getServiceTemplate(), ac);
+ }
+ }
+
+ private void updateAcDefinitionOutProperties(UUID compositionId, UUID replicaId, List<ParticipantDefinition> list) {
+ var acDefinitionOpt = acDefinitionProvider.findAcDefinition(compositionId);
if (acDefinitionOpt.isEmpty()) {
- LOGGER.error("Ac Definition with id {} not found", composotionId);
+ LOGGER.error("Ac Definition with id {} not found", compositionId);
return;
}
var acDefinition = acDefinitionOpt.get();
@@ -155,71 +179,47 @@ public class SupervisionParticipantHandler {
}
acDefinitionProvider.updateAcDefinition(acDefinition,
acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName());
+ participantSyncPublisher.sendSync(acDefinition, replicaId);
}
- private void checkOnline(Participant participant) {
- if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) {
- participant.setParticipantState(ParticipantState.ON_LINE);
+ private void checkOnline(ParticipantReplica replica) {
+ if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) {
+ replica.setParticipantState(ParticipantState.ON_LINE);
}
- participant.setLastMsg(TimestampHelper.now());
- participantProvider.saveParticipant(participant);
+ replica.setLastMsg(TimestampHelper.now());
+ participantProvider.saveParticipantReplica(replica);
}
- private void handleRestart(UUID participantId) {
+ private void handleRestart(UUID participantId, UUID replicaId) {
var compositionIds = participantProvider.getCompositionIds(participantId);
for (var compositionId : compositionIds) {
var acDefinition = acDefinitionProvider.getAcDefinition(compositionId);
LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId());
- handleRestart(participantId, acDefinition);
+ handleSyncRestart(participantId, replicaId, acDefinition);
}
}
- private void handleRestart(UUID participantId, AutomationCompositionDefinition acDefinition) {
+ private void handleSyncRestart(final UUID participantId, UUID replicaId,
+ AutomationCompositionDefinition acDefinition) {
if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId());
return;
}
LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId());
- for (var elementState : acDefinition.getElementStateMap().values()) {
- if (participantId.equals(elementState.getParticipantId())) {
- elementState.setRestarting(true);
- }
- }
var automationCompositionList =
automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
- List<AutomationComposition> automationCompositions = new ArrayList<>();
- for (var automationComposition : automationCompositionList) {
- if (isAcToBeRestarted(participantId, automationComposition)) {
- automationCompositions.add(automationComposition);
- }
- }
- // expected final state
- if (StateChangeResult.TIMEOUT.equals(acDefinition.getStateChangeResult())) {
- acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR);
- }
- acDefinition.setRestarting(true);
- acDefinitionProvider.updateAcDefinition(acDefinition,
- acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName());
- participantRestartPublisher.send(participantId, acDefinition, automationCompositions);
+ var automationCompositions = automationCompositionList.stream()
+ .filter(ac -> isAcToBeSyncRestarted(participantId, ac)).toList();
+ participantSyncPublisher.sendRestartMsg(participantId, replicaId, acDefinition, automationCompositions);
}
- private boolean isAcToBeRestarted(UUID participantId, AutomationComposition automationComposition) {
- boolean toAdd = false;
+ private boolean isAcToBeSyncRestarted(UUID participantId, AutomationComposition automationComposition) {
for (var element : automationComposition.getElements().values()) {
if (participantId.equals(element.getParticipantId())) {
- element.setRestarting(true);
- toAdd = true;
- }
- }
- if (toAdd) {
- automationComposition.setRestarting(true);
- // expected final state
- if (StateChangeResult.TIMEOUT.equals(automationComposition.getStateChangeResult())) {
- automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR);
+ return true;
}
- automationCompositionProvider.updateAutomationComposition(automationComposition);
}
- return toAdd;
+ return false;
}
private Participant createParticipant(UUID participantId,
@@ -227,8 +227,6 @@ public class SupervisionParticipantHandler {
var participant = new Participant();
participant.setParticipantId(participantId);
participant.setParticipantSupportedElementTypes(participantSupportedElementType);
- participant.setParticipantState(ParticipantState.ON_LINE);
- participant.setLastMsg(TimestampHelper.now());
return participant;
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java
index 4d2a22f26..4ada199b6 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java
@@ -21,8 +21,7 @@
package org.onap.policy.clamp.acm.runtime.supervision;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.models.acm.concepts.Participant;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
import org.slf4j.Logger;
@@ -33,20 +32,20 @@ import org.springframework.stereotype.Component;
* This class is used to scan the automation compositions in the database and check if they are in the correct state.
*/
@Component
-public class SupervisionPartecipantScanner {
- private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionPartecipantScanner.class);
+public class SupervisionParticipantScanner {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionParticipantScanner.class);
private final long maxWaitMs;
private final ParticipantProvider participantProvider;
/**
- * Constructor for instantiating SupervisionPartecipantScanner.
+ * Constructor for instantiating SupervisionParticipantScanner.
*
* @param participantProvider the Participant Provider
* @param acRuntimeParameterGroup the parameters for the automation composition runtime
*/
- public SupervisionPartecipantScanner(final ParticipantProvider participantProvider,
+ public SupervisionParticipantScanner(final ParticipantProvider participantProvider,
final AcRuntimeParameterGroup acRuntimeParameterGroup) {
this.participantProvider = participantProvider;
this.maxWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs();
@@ -56,27 +55,17 @@ public class SupervisionPartecipantScanner {
* Run Scanning.
*/
public void run() {
- LOGGER.debug("Scanning participans in the database . . .");
-
- for (var participant : participantProvider.getParticipants()) {
- scanParticipantStatus(participant);
- }
-
- LOGGER.debug("Participans scan complete . . .");
+ LOGGER.debug("Scanning participants in the database . . .");
+ participantProvider.findReplicasOnLine().forEach(this::scanParticipantReplicaStatus);
+ LOGGER.debug("Participants scan complete . . .");
}
- private void scanParticipantStatus(Participant participant) {
- var id = participant.getParticipantId();
- if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) {
- LOGGER.debug("report Participant is still OFF_LINE {}", id);
- return;
- }
+ private void scanParticipantReplicaStatus(ParticipantReplica replica) {
var now = TimestampHelper.nowEpochMilli();
- var lastMsg = TimestampHelper.toEpochMilli(participant.getLastMsg());
+ var lastMsg = TimestampHelper.toEpochMilli(replica.getLastMsg());
if ((now - lastMsg) > maxWaitMs) {
- LOGGER.debug("report Participant OFF_LINE {}", id);
- participant.setParticipantState(ParticipantState.OFF_LINE);
- participantProvider.updateParticipant(participant);
+ LOGGER.debug("Participant OFF_LINE {}", replica.getReplicaId());
+ participantProvider.deleteParticipantReplica(replica.getReplicaId());
}
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
index 06d464671..db67e5eea 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
@@ -22,17 +22,21 @@
package org.onap.policy.clamp.acm.runtime.supervision;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.UUID;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher;
import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
import org.onap.policy.clamp.models.acm.concepts.DeployState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
+import org.onap.policy.clamp.models.acm.concepts.SubState;
import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
@@ -55,6 +59,8 @@ public class SupervisionScanner {
private final AcDefinitionProvider acDefinitionProvider;
private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher;
private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher;
+ private final ParticipantSyncPublisher participantSyncPublisher;
+ private final AutomationCompositionMigrationPublisher automationCompositionMigrationPublisher;
/**
* Constructor for instantiating SupervisionScanner.
@@ -69,11 +75,15 @@ public class SupervisionScanner {
final AcDefinitionProvider acDefinitionProvider,
final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher,
final AutomationCompositionDeployPublisher automationCompositionDeployPublisher,
+ final ParticipantSyncPublisher participantSyncPublisher,
+ final AutomationCompositionMigrationPublisher automationCompositionMigrationPublisher,
final AcRuntimeParameterGroup acRuntimeParameterGroup) {
this.automationCompositionProvider = automationCompositionProvider;
this.acDefinitionProvider = acDefinitionProvider;
this.automationCompositionStateChangePublisher = automationCompositionStateChangePublisher;
this.automationCompositionDeployPublisher = automationCompositionDeployPublisher;
+ this.participantSyncPublisher = participantSyncPublisher;
+ this.automationCompositionMigrationPublisher = automationCompositionMigrationPublisher;
this.maxStatusWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs();
}
@@ -91,11 +101,9 @@ public class SupervisionScanner {
var acList = automationCompositionProvider.getAcInstancesInTransition();
HashMap<UUID, AutomationCompositionDefinition> acDefinitionMap = new HashMap<>();
for (var automationComposition : acList) {
- var acDefinition = acDefinitionMap.get(automationComposition.getCompositionId());
- if (acDefinition == null) {
- acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId());
- acDefinitionMap.put(acDefinition.getCompositionId(), acDefinition);
- }
+ var compositionId = automationComposition.getCompositionTargetId() != null
+ ? automationComposition.getCompositionTargetId() : automationComposition.getCompositionId();
+ var acDefinition = acDefinitionMap.computeIfAbsent(compositionId, acDefinitionProvider::getAcDefinition);
scanAutomationComposition(automationComposition, acDefinition.getServiceTemplate());
}
LOGGER.debug("Automation composition scan complete . . .");
@@ -113,11 +121,13 @@ public class SupervisionScanner {
for (var element : acDefinition.getElementStateMap().values()) {
if (!finalState.equals(element.getState())) {
completed = false;
+ break;
}
}
if (completed) {
acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), finalState,
- StateChangeResult.NO_ERROR, null);
+ StateChangeResult.NO_ERROR);
+ participantSyncPublisher.sendSync(acDefinition, null);
} else {
handleTimeout(acDefinition);
}
@@ -128,14 +138,33 @@ public class SupervisionScanner {
LOGGER.debug("scanning automation composition {} . . .", automationComposition.getInstanceId());
if (!AcmUtils.isInTransitionalState(automationComposition.getDeployState(),
- automationComposition.getLockState())
+ automationComposition.getLockState(), automationComposition.getSubState())
|| StateChangeResult.FAILED.equals(automationComposition.getStateChangeResult())) {
LOGGER.debug("automation composition {} scanned, OK", automationComposition.getInstanceId());
- // Clear Timeout on automation composition
return;
}
+ if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) {
+ scanStage(automationComposition, serviceTemplate);
+ } else if (DeployState.UPDATING.equals(automationComposition.getDeployState())
+ || SubState.PREPARING.equals(automationComposition.getSubState())
+ || SubState.REVIEWING.equals(automationComposition.getSubState())
+ || SubState.MIGRATION_PRECHECKING.equals(automationComposition.getSubState())) {
+ simpleScan(automationComposition, serviceTemplate);
+ } else {
+ scanWithPhase(automationComposition, serviceTemplate);
+ }
+ }
+
+ /**
+ * Scan with startPhase: DEPLOY, UNDEPLOY, LOCK and UNLOCK.
+ *
+ * @param automationComposition the AutomationComposition
+ * @param serviceTemplate the ToscaServiceTemplate
+ */
+ private void scanWithPhase(final AutomationComposition automationComposition,
+ ToscaServiceTemplate serviceTemplate) {
var completed = true;
var minSpNotCompleted = 1000; // min startPhase not completed
var maxSpNotCompleted = 0; // max startPhase not completed
@@ -147,7 +176,8 @@ public class SupervisionScanner {
int startPhase = ParticipantUtils.findStartPhase(toscaNodeTemplate.getProperties());
defaultMin = Math.min(defaultMin, startPhase);
defaultMax = Math.max(defaultMax, startPhase);
- if (AcmUtils.isInTransitionalState(element.getDeployState(), element.getLockState())) {
+ if (AcmUtils.isInTransitionalState(element.getDeployState(), element.getLockState(),
+ element.getSubState())) {
completed = false;
minSpNotCompleted = Math.min(minSpNotCompleted, startPhase);
maxSpNotCompleted = Math.max(maxSpNotCompleted, startPhase);
@@ -155,35 +185,86 @@ public class SupervisionScanner {
}
if (completed) {
- LOGGER.debug("automation composition scan: transition state {} {} completed",
- automationComposition.getDeployState(), automationComposition.getLockState());
-
- complete(automationComposition);
+ complete(automationComposition, serviceTemplate);
} else {
LOGGER.debug("automation composition scan: transition state {} {} not completed",
automationComposition.getDeployState(), automationComposition.getLockState());
- if (DeployState.UPDATING.equals(automationComposition.getDeployState())
- || DeployState.MIGRATING.equals(automationComposition.getDeployState())) {
- // UPDATING do not need phases
- handleTimeoutUpdate(automationComposition);
- return;
- }
-
var isForward =
AcmUtils.isForward(automationComposition.getDeployState(), automationComposition.getLockState());
var nextSpNotCompleted = isForward ? minSpNotCompleted : maxSpNotCompleted;
if (nextSpNotCompleted != automationComposition.getPhase()) {
- sendAutomationCompositionMsg(automationComposition, serviceTemplate, nextSpNotCompleted, false);
+ sendAutomationCompositionMsg(automationComposition, serviceTemplate, nextSpNotCompleted);
+ } else {
+ handleTimeout(automationComposition, serviceTemplate);
+ }
+ }
+ }
+
+ /**
+ * Simple scan: UPDATE, PREPARE, REVIEW, MIGRATE_PRECHECKING.
+ *
+ * @param automationComposition the AutomationComposition
+ * @param serviceTemplate the ToscaServiceTemplate
+ */
+ private void simpleScan(final AutomationComposition automationComposition, ToscaServiceTemplate serviceTemplate) {
+ var completed = automationComposition.getElements().values().stream()
+ .filter(element -> AcmUtils.isInTransitionalState(element.getDeployState(), element.getLockState(),
+ element.getSubState())).findFirst().isEmpty();
+
+ if (completed) {
+ complete(automationComposition, serviceTemplate);
+ } else {
+ handleTimeout(automationComposition, serviceTemplate);
+ }
+ }
+
+ /**
+ * Scan with stage: MIGRATE.
+ *
+ * @param automationComposition the AutomationComposition
+ * @param serviceTemplate the ToscaServiceTemplate
+ */
+ private void scanStage(final AutomationComposition automationComposition, ToscaServiceTemplate serviceTemplate) {
+ var completed = true;
+ var minStageNotCompleted = 1000; // min stage not completed
+ for (var element : automationComposition.getElements().values()) {
+ if (AcmUtils.isInTransitionalState(element.getDeployState(), element.getLockState(),
+ element.getSubState())) {
+ var toscaNodeTemplate = serviceTemplate.getToscaTopologyTemplate().getNodeTemplates()
+ .get(element.getDefinition().getName());
+ var stageSet = ParticipantUtils.findStageSet(toscaNodeTemplate.getProperties());
+ var minStage = stageSet.stream().min(Comparator.comparing(Integer::valueOf)).orElse(0);
+ int stage = element.getStage() != null ? element.getStage() : minStage;
+ minStageNotCompleted = Math.min(minStageNotCompleted, stage);
+ completed = false;
+ }
+ }
+
+ if (completed) {
+ complete(automationComposition, serviceTemplate);
+ } else {
+ LOGGER.debug("automation composition scan: transition from state {} to {} not completed",
+ automationComposition.getDeployState(), automationComposition.getLockState());
+
+ if (minStageNotCompleted != automationComposition.getPhase()) {
+ savePhase(automationComposition, minStageNotCompleted);
+ LOGGER.debug("retry message AutomationCompositionMigration");
+ automationCompositionMigrationPublisher.send(automationComposition, minStageNotCompleted);
} else {
- handleTimeoutWithPhase(automationComposition, serviceTemplate);
+ handleTimeout(automationComposition, serviceTemplate);
}
}
}
- private void complete(final AutomationComposition automationComposition) {
+ private void complete(final AutomationComposition automationComposition,
+ ToscaServiceTemplate serviceTemplate) {
+ LOGGER.debug("automation composition scan: transition state {} {} {} completed",
+ automationComposition.getDeployState(), automationComposition.getLockState(),
+ automationComposition.getSubState());
+
var deployState = automationComposition.getDeployState();
if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) {
// migration scenario
@@ -193,14 +274,18 @@ public class SupervisionScanner {
automationComposition.setDeployState(AcmUtils.deployCompleted(deployState));
automationComposition.setLockState(AcmUtils.lockCompleted(deployState, automationComposition.getLockState()));
automationComposition.setPhase(null);
+ automationComposition.setSubState(SubState.NONE);
+ automationComposition.setPrecheck(null);
if (StateChangeResult.TIMEOUT.equals(automationComposition.getStateChangeResult())) {
automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR);
}
+ var acToUpdate = automationComposition;
if (DeployState.DELETED.equals(automationComposition.getDeployState())) {
automationCompositionProvider.deleteAutomationComposition(automationComposition.getInstanceId());
} else {
- automationCompositionProvider.updateAutomationComposition(automationComposition);
+ acToUpdate = automationCompositionProvider.updateAcState(acToUpdate);
}
+ participantSyncPublisher.sendSync(serviceTemplate, acToUpdate);
}
private void handleTimeout(AutomationCompositionDefinition acDefinition) {
@@ -214,71 +299,48 @@ public class SupervisionScanner {
LOGGER.debug("Report timeout for the ac definition {}", acDefinition.getCompositionId());
acDefinition.setStateChangeResult(StateChangeResult.TIMEOUT);
acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(),
- acDefinition.getState(), acDefinition.getStateChangeResult(), acDefinition.getRestarting());
- }
- }
-
- private void handleTimeoutUpdate(AutomationComposition automationComposition) {
- if (StateChangeResult.TIMEOUT.equals(automationComposition.getStateChangeResult())) {
- LOGGER.debug("The ac instance is in timeout {}", automationComposition.getInstanceId());
- return;
- }
- var now = TimestampHelper.nowEpochMilli();
- var lastMsg = TimestampHelper.toEpochMilli(automationComposition.getLastMsg());
- for (var element : automationComposition.getElements().values()) {
- if (!AcmUtils.isInTransitionalState(element.getDeployState(), element.getLockState())) {
- continue;
- }
- if ((now - lastMsg) > maxStatusWaitMs) {
- LOGGER.debug("Report timeout for the ac instance {}", automationComposition.getInstanceId());
- automationComposition.setStateChangeResult(StateChangeResult.TIMEOUT);
- automationCompositionProvider.updateAutomationComposition(automationComposition);
- break;
- }
+ acDefinition.getState(), acDefinition.getStateChangeResult());
+ participantSyncPublisher.sendSync(acDefinition, null);
}
}
- private void handleTimeoutWithPhase(AutomationComposition automationComposition,
+ private void handleTimeout(AutomationComposition automationComposition,
ToscaServiceTemplate serviceTemplate) {
+ LOGGER.debug("automation composition scan: transition from state {} to {} {} not completed",
+ automationComposition.getDeployState(), automationComposition.getLockState(),
+ automationComposition.getSubState());
+
if (StateChangeResult.TIMEOUT.equals(automationComposition.getStateChangeResult())) {
LOGGER.debug("The ac instance is in timeout {}", automationComposition.getInstanceId());
return;
}
- int currentPhase = automationComposition.getPhase();
var now = TimestampHelper.nowEpochMilli();
var lastMsg = TimestampHelper.toEpochMilli(automationComposition.getLastMsg());
- for (var element : automationComposition.getElements().values()) {
- if (!AcmUtils.isInTransitionalState(element.getDeployState(), element.getLockState())) {
- continue;
- }
- var toscaNodeTemplate = serviceTemplate.getToscaTopologyTemplate().getNodeTemplates()
- .get(element.getDefinition().getName());
- int startPhase = ParticipantUtils.findStartPhase(toscaNodeTemplate.getProperties());
- if (currentPhase != startPhase) {
- continue;
- }
- if ((now - lastMsg) > maxStatusWaitMs) {
- LOGGER.debug("Report timeout for the ac instance {}", automationComposition.getInstanceId());
- automationComposition.setStateChangeResult(StateChangeResult.TIMEOUT);
- automationCompositionProvider.updateAutomationComposition(automationComposition);
- break;
- }
+ if ((now - lastMsg) > maxStatusWaitMs) {
+ LOGGER.debug("Report timeout for the ac instance {}", automationComposition.getInstanceId());
+ automationComposition.setStateChangeResult(StateChangeResult.TIMEOUT);
+ automationCompositionProvider.updateAcState(automationComposition);
+ participantSyncPublisher.sendSync(serviceTemplate, automationComposition);
}
}
- private void sendAutomationCompositionMsg(AutomationComposition automationComposition,
- ToscaServiceTemplate serviceTemplate, int startPhase, boolean firstStartPhase) {
+ private void savePhase(AutomationComposition automationComposition, int startPhase) {
automationComposition.setLastMsg(TimestampHelper.now());
automationComposition.setPhase(startPhase);
- automationCompositionProvider.updateAutomationComposition(automationComposition);
+ automationCompositionProvider.updateAcState(automationComposition);
+ }
+
+ private void sendAutomationCompositionMsg(AutomationComposition automationComposition,
+ ToscaServiceTemplate serviceTemplate, int startPhase) {
+ savePhase(automationComposition, startPhase);
if (DeployState.DEPLOYING.equals(automationComposition.getDeployState())) {
- LOGGER.debug("retry message AutomationCompositionUpdate");
+ LOGGER.debug("retry message AutomationCompositionDeploy");
automationCompositionDeployPublisher.send(automationComposition, serviceTemplate, startPhase,
- firstStartPhase);
+ false);
} else {
LOGGER.debug("retry message AutomationCompositionStateChange");
- automationCompositionStateChangePublisher.send(automationComposition, startPhase, firstStartPhase);
+ automationCompositionStateChangePublisher.send(automationComposition, startPhase, false);
}
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java
index 5014f7dc3..19130c225 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java
@@ -21,14 +21,11 @@
package org.onap.policy.clamp.acm.runtime.supervision.comm;
import jakarta.ws.rs.core.Response.Status;
-import java.util.List;
-import java.util.Optional;
import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher;
-import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantAckMessage;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.client.TopicSinkClient;
public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMessage> implements Publisher {
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java
index 5afb7eba4..d3222caf5 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java
@@ -21,14 +21,11 @@
package org.onap.policy.clamp.acm.runtime.supervision.comm;
import jakarta.ws.rs.core.Response.Status;
-import java.util.List;
-import java.util.Optional;
import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher;
-import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessage;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.onap.policy.common.message.bus.event.TopicSink;
+import org.onap.policy.common.message.bus.event.client.TopicSinkClient;
public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> implements Publisher {
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcElementPropertiesPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcElementPropertiesPublisher.java
index 338f2960d..cc5d1461f 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcElementPropertiesPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcElementPropertiesPublisher.java
@@ -56,7 +56,7 @@ public class AcElementPropertiesPublisher extends AbstractParticipantPublisher<P
propertiesUpdate.setParticipantUpdatesList(
AcmUtils.createParticipantDeployList(automationComposition, DeployOrder.UPDATE));
- LOGGER.debug("AC Element properties update sent {}", propertiesUpdate);
+ LOGGER.debug("AC Element properties update sent {}", propertiesUpdate.getMessageId());
super.send(propertiesUpdate);
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcPreparePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcPreparePublisher.java
new file mode 100644
index 000000000..acf403595
--- /dev/null
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AcPreparePublisher.java
@@ -0,0 +1,78 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.runtime.supervision.comm;
+
+import io.micrometer.core.annotation.Timed;
+import java.time.Instant;
+import java.util.UUID;
+import lombok.AllArgsConstructor;
+import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionPrepare;
+import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
+import org.onap.policy.clamp.models.acm.utils.AcmUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+@Component
+@AllArgsConstructor
+public class AcPreparePublisher extends AbstractParticipantPublisher<AutomationCompositionPrepare> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AcPreparePublisher.class);
+
+ /**
+ * Send AutomationCompositionPrepare Prepare message to Participant.
+ *
+ * @param automationComposition the AutomationComposition
+ */
+ @Timed(value = "publisher.prepare", description = "AC Prepare Pre Deploy published")
+ public void sendPrepare(AutomationComposition automationComposition) {
+ var acPrepare = createAutomationCompositionPrepare(automationComposition.getCompositionId(),
+ automationComposition.getInstanceId());
+ acPrepare.setParticipantList(
+ AcmUtils.createParticipantDeployList(automationComposition, DeployOrder.NONE));
+ LOGGER.debug("AC Prepare sent {}", acPrepare);
+ super.send(acPrepare);
+ }
+
+ /**
+ * Send AutomationCompositionPrepare Review message to Participant.
+ *
+ * @param automationComposition the AutomationComposition
+ */
+ @Timed(value = "publisher.review", description = "AC Review Post Deploy published")
+ public void sendRevew(AutomationComposition automationComposition) {
+ var acPrepare = createAutomationCompositionPrepare(automationComposition.getCompositionId(),
+ automationComposition.getInstanceId());
+ acPrepare.setPreDeploy(false);
+ LOGGER.debug("AC Review sent {}", acPrepare);
+ super.send(acPrepare);
+ }
+
+ private AutomationCompositionPrepare createAutomationCompositionPrepare(UUID compositionId, UUID instanceId) {
+ var acPrepare = new AutomationCompositionPrepare();
+ acPrepare.setCompositionId(compositionId);
+ acPrepare.setAutomationCompositionId(instanceId);
+ acPrepare.setMessageId(UUID.randomUUID());
+ acPrepare.setTimestamp(Instant.now());
+ return acPrepare;
+ }
+}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionDeployPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionDeployPublisher.java
index 2b6435e67..7fe63a7c5 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionDeployPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionDeployPublisher.java
@@ -87,7 +87,7 @@ public class AutomationCompositionDeployPublisher extends AbstractParticipantPub
acDeployMsg.setTimestamp(Instant.now());
acDeployMsg.setParticipantUpdatesList(participantDeploys);
- LOGGER.debug("AutomationCompositionDeploy message sent {}", acDeployMsg);
+ LOGGER.debug("AutomationCompositionDeploy message sent {}", acDeployMsg.getMessageId());
super.send(acDeployMsg);
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionMigrationPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionMigrationPublisher.java
index e26a5403b..572f7b1fe 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionMigrationPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionMigrationPublisher.java
@@ -36,20 +36,22 @@ public class AutomationCompositionMigrationPublisher
* Send AutomationCompositionMigration message to Participant.
*
* @param automationComposition the AutomationComposition
- * @param compositionTargetId the Composition Definition Target
+ * @param stage the stage to execute
*/
@Timed(
value = "publisher.automation_composition_migration",
description = "AUTOMATION_COMPOSITION_MIGRATION messages published")
- public void send(AutomationComposition automationComposition, UUID compositionTargetId) {
- var acsc = new AutomationCompositionMigration();
- acsc.setCompositionId(automationComposition.getCompositionId());
- acsc.setAutomationCompositionId(automationComposition.getInstanceId());
- acsc.setMessageId(UUID.randomUUID());
- acsc.setCompositionTargetId(compositionTargetId);
- acsc.setParticipantUpdatesList(
+ public void send(AutomationComposition automationComposition, int stage) {
+ var acMigration = new AutomationCompositionMigration();
+ acMigration.setPrecheck(Boolean.TRUE.equals(automationComposition.getPrecheck()));
+ acMigration.setCompositionId(automationComposition.getCompositionId());
+ acMigration.setAutomationCompositionId(automationComposition.getInstanceId());
+ acMigration.setMessageId(UUID.randomUUID());
+ acMigration.setCompositionTargetId(automationComposition.getCompositionTargetId());
+ acMigration.setStage(stage);
+ acMigration.setParticipantUpdatesList(
AcmUtils.createParticipantDeployList(automationComposition, DeployOrder.MIGRATE));
- super.send(acsc);
+ super.send(acMigration);
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangeAckListener.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangeAckListener.java
index faca6a897..a0b74b574 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangeAckListener.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionStateChangeAckListener.java
@@ -25,8 +25,8 @@ import org.onap.policy.clamp.acm.runtime.config.messaging.Listener;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdateAckListener.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdateAckListener.java
index aceec0400..551a27c86 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdateAckListener.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AutomationCompositionUpdateAckListener.java
@@ -25,8 +25,8 @@ import org.onap.policy.clamp.acm.runtime.config.messaging.Listener;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantDeregisterListener.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantDeregisterListener.java
index a4e400263..1649c2543 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantDeregisterListener.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantDeregisterListener.java
@@ -25,8 +25,8 @@ import org.onap.policy.clamp.acm.runtime.config.messaging.Listener;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimeAckListener.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimeAckListener.java
index 55682d154..a6a3ab648 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimeAckListener.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimeAckListener.java
@@ -25,8 +25,8 @@ import org.onap.policy.clamp.acm.runtime.config.messaging.Listener;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionHandler;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java
index 89763a2b6..360e526cc 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java
@@ -31,7 +31,6 @@ import java.util.Map;
import java.util.UUID;
import lombok.AllArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
@@ -55,7 +54,6 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantPrimePublisher.class);
private final ParticipantProvider participantProvider;
- private final AcmParticipantProvider acmParticipantProvider;
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
/**
@@ -74,7 +72,7 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
message.setParticipantId(participantId);
message.setTimestamp(Instant.now());
message.setParticipantDefinitionUpdates(participantDefinitions);
- LOGGER.debug("Participant Update sent {}", message);
+ LOGGER.debug("Participant Update sent {}", message.getMessageId());
super.send(message);
}
@@ -99,9 +97,7 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
elementState.setState(AcTypeState.PRIMING);
participantIds.add(elementState.getParticipantId());
- var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
- elementEntry.getValue().getTypeVersion());
- supportedElementMap.put(type, elementState.getParticipantId());
+ supportedElementMap.put(AcmUtils.getType(elementEntry.getValue()), elementState.getParticipantId());
}
} else {
// scenario Prime participants not assigned yet
@@ -109,16 +105,14 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
for (var elementEntry : acElements) {
var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
elementState.setState(AcTypeState.PRIMING);
- var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
- elementEntry.getValue().getTypeVersion());
- var participantId = supportedElementMap.get(type);
+ var participantId = supportedElementMap.get(AcmUtils.getType(elementEntry.getValue()));
if (participantId != null) {
elementState.setParticipantId(participantId);
participantIds.add(participantId);
}
}
}
- acmParticipantProvider.verifyParticipantState(participantIds);
+ participantProvider.verifyParticipantState(participantIds);
return AcmUtils.prepareParticipantPriming(acElements, supportedElementMap);
}
@@ -133,7 +127,7 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
// DeCommission the automation composition but deleting participantdefinitions on participants
message.setParticipantDefinitionUpdates(null);
- LOGGER.debug("Participant Update sent {}", message);
+ LOGGER.debug("Participant Update sent {}", message.getMessageId());
super.send(message);
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterAckPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
index d021c57a4..5e073080b 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterAckPublisher.java
@@ -36,14 +36,21 @@ public class ParticipantRegisterAckPublisher extends AbstractParticipantAckPubli
*
* @param responseTo the original request id in the request.
* @param participantId the participant Id
+ * @param replicaId the participant replica Id
*/
@Timed(value = "publisher.participant_register_ack", description = "PARTICIPANT_REGISTER_ACK messages published")
- public void send(UUID responseTo, UUID participantId) {
+ public void send(UUID responseTo, UUID participantId, UUID replicaId) {
var message = new ParticipantRegisterAck();
message.setParticipantId(participantId);
+ message.setReplicaId(replicaId);
message.setResponseTo(responseTo);
message.setMessage("Participant Register Ack");
message.setResult(true);
super.send(message);
}
+
+ @Override
+ public boolean isDefaultTopic() {
+ return false;
+ }
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterListener.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterListener.java
index 3b7db1c7d..d78cec7db 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterListener.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRegisterListener.java
@@ -25,8 +25,8 @@ import org.onap.policy.clamp.acm.runtime.config.messaging.Listener;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
deleted file mode 100644
index 4f28eab8e..000000000
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.clamp.acm.runtime.supervision.comm;
-
-import io.micrometer.core.annotation.Timed;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-import lombok.AllArgsConstructor;
-import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
-import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
-import org.onap.policy.clamp.models.acm.utils.AcmUtils;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-@Component
-@AllArgsConstructor
-public class ParticipantRestartPublisher extends AbstractParticipantPublisher<ParticipantRestart> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantRestartPublisher.class);
- private final AcRuntimeParameterGroup acRuntimeParameterGroup;
-
- /**
- * Send Restart to Participant.
- *
- * @param participantId the ParticipantId
- * @param acmDefinition the AutomationComposition Definition
- * @param automationCompositions the list of automationCompositions
- */
- @Timed(value = "publisher.participant_restart", description = "Participant Restart published")
- public void send(UUID participantId, AutomationCompositionDefinition acmDefinition,
- List<AutomationComposition> automationCompositions) {
-
- var message = new ParticipantRestart();
- message.setParticipantId(participantId);
- message.setCompositionId(acmDefinition.getCompositionId());
- message.setMessageId(UUID.randomUUID());
- message.setTimestamp(Instant.now());
- message.setState(acmDefinition.getState());
- message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition));
- var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate());
-
- for (var automationComposition : automationCompositions) {
- var restartAc = new ParticipantRestartAc();
- restartAc.setAutomationCompositionId(automationComposition.getInstanceId());
- for (var element : automationComposition.getElements().values()) {
- if (participantId.equals(element.getParticipantId())) {
- var acElementRestart = AcmUtils.createAcElementRestart(element);
- acElementRestart.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
- restartAc.getAcElementList().add(acElementRestart);
- }
- }
- message.getAutomationcompositionList().add(restartAc);
- }
-
- LOGGER.debug("Participant Restart sent {}", message);
- super.send(message);
- }
-
- protected List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId,
- AutomationCompositionDefinition acmDefinition) {
- var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(),
- acRuntimeParameterGroup.getAcmParameters().getToscaElementName());
-
- // list of entry filtered by participantId
- List<Entry<String, ToscaNodeTemplate>> elementList = new ArrayList<>();
- Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>();
- for (var elementEntry : acElements) {
- var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
- if (participantId.equals(elementState.getParticipantId())) {
- var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
- elementEntry.getValue().getTypeVersion());
- supportedElementMap.put(type, participantId);
- elementList.add(elementEntry);
- }
- }
- var list = AcmUtils.prepareParticipantPriming(elementList, supportedElementMap);
- for (var participantDefinition : list) {
- for (var elementDe : participantDefinition.getAutomationCompositionElementDefinitionList()) {
- var state = acmDefinition.getElementStateMap().get(elementDe.getAcElementDefinitionId().getName());
- if (state != null) {
- elementDe.setOutProperties(state.getOutProperties());
- }
- }
- }
- return list;
- }
-}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusListener.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusListener.java
index 700e61df3..0a7a5eb93 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusListener.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusListener.java
@@ -25,8 +25,8 @@ import org.onap.policy.clamp.acm.runtime.config.messaging.Listener;
import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java
index 76feee72f..96abac494 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java
@@ -40,11 +40,16 @@ public class ParticipantStatusReqPublisher extends AbstractParticipantPublisher<
*/
@Timed(value = "publisher.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages published")
public void send(UUID participantId) {
- ParticipantStatusReq message = new ParticipantStatusReq();
+ var message = new ParticipantStatusReq();
message.setParticipantId(participantId);
message.setTimestamp(Instant.now());
- LOGGER.debug("Participant StatusReq sent {}", message);
+ LOGGER.debug("Participant StatusReq sent {}", message.getMessageId());
super.send(message);
}
+
+ @Override
+ public boolean isDefaultTopic() {
+ return false;
+ }
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
index ae7eda1ee..d90b6f667 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
@@ -23,69 +23,58 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
import io.micrometer.core.annotation.Timed;
import java.time.Instant;
import java.util.List;
-import java.util.Optional;
import java.util.UUID;
+import lombok.AllArgsConstructor;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
+import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
import org.onap.policy.clamp.models.acm.utils.AcmUtils;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-
@Component
-public class ParticipantSyncPublisher extends ParticipantRestartPublisher {
+@AllArgsConstructor
+public class ParticipantSyncPublisher extends AbstractParticipantPublisher<ParticipantSync> {
private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class);
-
private final AcRuntimeParameterGroup acRuntimeParameterGroup;
- public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) {
- super(acRuntimeParameterGroup);
- this.acRuntimeParameterGroup = acRuntimeParameterGroup;
- }
-
-
/**
- * Send sync msg to Participant.
+ * Send Restart sync msg to Participant by participantId.
*
- * @param participantId the ParticipantId
+ * @param participantId the participantId
+ * @param replicaId the replicaId
* @param acmDefinition the AutomationComposition Definition
* @param automationCompositions the list of automationCompositions
*/
- @Override
@Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
- public void send(UUID participantId, AutomationCompositionDefinition acmDefinition,
+ public void sendRestartMsg(UUID participantId, UUID replicaId, AutomationCompositionDefinition acmDefinition,
List<AutomationComposition> automationCompositions) {
var message = new ParticipantSync();
message.setParticipantId(participantId);
+ message.setReplicaId(replicaId);
+ message.setRestarting(true);
message.setCompositionId(acmDefinition.getCompositionId());
message.setMessageId(UUID.randomUUID());
message.setTimestamp(Instant.now());
message.setState(acmDefinition.getState());
- message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition));
+ message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(participantId, acmDefinition,
+ acRuntimeParameterGroup.getAcmParameters().getToscaElementName()));
var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate());
for (var automationComposition : automationCompositions) {
- var syncAc = new ParticipantRestartAc();
- syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
- for (var element : automationComposition.getElements().values()) {
- if (participantId.equals(element.getParticipantId())) {
- var acElementSync = AcmUtils.createAcElementRestart(element);
- acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
- syncAc.getAcElementList().add(acElementSync);
- }
- }
+ var syncAc = AcmUtils.createAcRestart(automationComposition, participantId, toscaServiceTemplateFragment);
message.getAutomationcompositionList().add(syncAc);
}
- LOGGER.debug("Participant Sync sent {}", message);
+ LOGGER.debug("Participant Restarting Sync sent {}", message);
super.send(message);
}
@@ -98,4 +87,66 @@ public class ParticipantSyncPublisher extends ParticipantRestartPublisher {
return false;
}
+ /**
+ * Send AutomationCompositionDefinition sync msg to all Participants.
+ *
+ * @param acDefinition the AutomationComposition Definition
+ * @param excludeReplicaId the replica to be excluded
+ */
+ @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+ public void sendSync(AutomationCompositionDefinition acDefinition, UUID excludeReplicaId) {
+ var message = new ParticipantSync();
+ message.setCompositionId(acDefinition.getCompositionId());
+ if (excludeReplicaId != null) {
+ message.getExcludeReplicas().add(excludeReplicaId);
+ }
+ message.setState(acDefinition.getState());
+ message.setStateChangeResult(acDefinition.getStateChangeResult());
+ message.setMessageId(UUID.randomUUID());
+ message.setTimestamp(Instant.now());
+ if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
+ message.setDelete(true);
+ } else {
+ message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(null, acDefinition,
+ acRuntimeParameterGroup.getAcmParameters().getToscaElementName()));
+ }
+ LOGGER.debug("Participant AutomationCompositionDefinition Sync sent {}", message);
+ super.send(message);
+ }
+
+ /**
+ * Send AutomationComposition sync msg to all Participants.
+ *
+ * @param serviceTemplate the ServiceTemplate
+ * @param automationComposition the automationComposition
+ */
+ @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+ public void sendSync(ToscaServiceTemplate serviceTemplate, AutomationComposition automationComposition) {
+ var message = new ParticipantSync();
+ message.setCompositionId(automationComposition.getCompositionId());
+ message.setAutomationCompositionId(automationComposition.getInstanceId());
+ message.setState(AcTypeState.PRIMED);
+ message.setMessageId(UUID.randomUUID());
+ message.setTimestamp(Instant.now());
+ var syncAc = new ParticipantRestartAc();
+ syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
+ syncAc.setDeployState(automationComposition.getDeployState());
+ syncAc.setLockState(automationComposition.getLockState());
+ syncAc.setStateChangeResult(automationComposition.getStateChangeResult());
+ if (DeployState.DELETED.equals(automationComposition.getDeployState())) {
+ message.setDelete(true);
+ } else {
+ var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(serviceTemplate);
+ for (var element : automationComposition.getElements().values()) {
+ var acElementSync = AcmUtils.createAcElementRestart(element);
+ acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
+ syncAc.getAcElementList().add(acElementSync);
+
+ }
+ }
+ message.getAutomationcompositionList().add(syncAc);
+
+ LOGGER.debug("Participant AutomationComposition Sync sent {}", message.getMessageId());
+ super.send(message);
+ }
}
diff --git a/runtime-acm/src/main/resources/application.yaml b/runtime-acm/src/main/resources/application.yaml
index 0e2585dba..bca01ace8 100644
--- a/runtime-acm/src/main/resources/application.yaml
+++ b/runtime-acm/src/main/resources/application.yaml
@@ -44,21 +44,18 @@ runtime:
maxStatusWaitMs: 200000
topicParameterGroup:
topicSources:
- -
- topic: ${runtime.topics.operationTopic}
+ - topic: ${runtime.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
- -
- topic: ${runtime.topics.operationTopic}
+ - topic: ${runtime.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
- -
- topic: ${runtime.topics.syncTopic}
+ - topic: ${runtime.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP