diff options
author | FrancescoFioraEst <francesco.fiora@est.tech> | 2024-06-14 14:10:58 +0100 |
---|---|---|
committer | Francesco Fiora <francesco.fiora@est.tech> | 2024-06-17 09:04:18 +0000 |
commit | ca2ee94054c580827fcfc7f07c9db641301d6b9a (patch) | |
tree | 837edef253934aa10a09ecafa5819035b9ca0097 /participant/participant-impl/participant-impl-kserve/src/main | |
parent | b52e095b34ee7c576f7ee83df05e2a09366a8c8a (diff) |
Remove restarting implementation from participants
Remove restarting implementation from participants and
Remove local Map from a1pms and kserve participants.
Issue-ID: POLICY-5046
Change-Id: I9cc2a33d603751c60007475414b45ca54f0aac25
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'participant/participant-impl/participant-impl-kserve/src/main')
-rw-r--r-- | participant/participant-impl/participant-impl-kserve/src/main/java/org/onap/policy/clamp/acm/participant/kserve/handler/AutomationCompositionElementHandler.java | 147 |
1 files changed, 70 insertions, 77 deletions
diff --git a/participant/participant-impl/participant-impl-kserve/src/main/java/org/onap/policy/clamp/acm/participant/kserve/handler/AutomationCompositionElementHandler.java b/participant/participant-impl/participant-impl-kserve/src/main/java/org/onap/policy/clamp/acm/participant/kserve/handler/AutomationCompositionElementHandler.java index d4b09c923..d9c932efd 100644 --- a/participant/participant-impl/participant-impl-kserve/src/main/java/org/onap/policy/clamp/acm/participant/kserve/handler/AutomationCompositionElementHandler.java +++ b/participant/participant-impl/participant-impl-kserve/src/main/java/org/onap/policy/clamp/acm/participant/kserve/handler/AutomationCompositionElementHandler.java @@ -26,28 +26,23 @@ import jakarta.validation.Validation; import jakarta.validation.ValidationException; import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.util.HashMap; import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import lombok.AccessLevel; -import lombok.Getter; import org.apache.http.HttpStatus; +import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto; +import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto; import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi; -import org.onap.policy.clamp.acm.participant.intermediary.api.impl.AcElementListenerV1; +import org.onap.policy.clamp.acm.participant.intermediary.api.impl.AcElementListenerV2; import org.onap.policy.clamp.acm.participant.kserve.exception.KserveException; import org.onap.policy.clamp.acm.participant.kserve.k8s.InferenceServiceValidator; import org.onap.policy.clamp.acm.participant.kserve.k8s.KserveClient; import org.onap.policy.clamp.acm.participant.kserve.models.ConfigurationEntity; import org.onap.policy.clamp.acm.participant.kserve.models.KserveInferenceEntity; -import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy; import org.onap.policy.clamp.models.acm.concepts.DeployState; -import org.onap.policy.clamp.models.acm.concepts.LockState; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; -import org.onap.policy.clamp.models.acm.utils.AcmUtils; import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; @@ -60,20 +55,17 @@ import org.springframework.stereotype.Component; * This class handles implementation of automationCompositionElement updates. */ @Component -public class AutomationCompositionElementHandler extends AcElementListenerV1 { +public class AutomationCompositionElementHandler extends AcElementListenerV2 { private static final Coder CODER = new StandardCoder(); private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private ExecutorService executor = Context.taskWrapping( + private final ExecutorService executor = Context.taskWrapping( Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); private final KserveClient kserveClient; - @Getter(AccessLevel.PACKAGE) - private final Map<UUID, ConfigurationEntity> configRequestMap = new ConcurrentHashMap<>(); - public AutomationCompositionElementHandler(ParticipantIntermediaryApi intermediaryApi, KserveClient kserveClient) { super(intermediaryApi); this.kserveClient = kserveClient; @@ -86,20 +78,25 @@ public class AutomationCompositionElementHandler extends AcElementListenerV1 { } @Override - public void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) { - var configurationEntity = configRequestMap.get(automationCompositionElementId); + public void undeploy(CompositionElementDto compositionElement, InstanceElementDto instanceElement) + throws PfModelException { + Map<String, Object> properties = new HashMap<>(compositionElement.inProperties()); + properties.putAll(instanceElement.inProperties()); + var configurationEntity = getConfigurationEntity(properties); if (configurationEntity != null) { try { for (KserveInferenceEntity kserveInferenceEntity : configurationEntity.getKserveInferenceEntities()) { kserveClient.undeployInferenceService(kserveInferenceEntity.getNamespace(), kserveInferenceEntity.getName()); } - configRequestMap.remove(automationCompositionElementId); - intermediaryApi.updateAutomationCompositionElementState(automationCompositionId, - automationCompositionElementId, DeployState.UNDEPLOYED, null, StateChangeResult.NO_ERROR, - "Undeployed"); + intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(), + instanceElement.elementId(), DeployState.UNDEPLOYED, null, + StateChangeResult.NO_ERROR, "Undeployed"); } catch (IOException | ApiException exception) { LOGGER.warn("Deletion of Inference service failed", exception); + intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(), + instanceElement.elementId(), DeployState.DEPLOYED, null, + StateChangeResult.FAILED, "Undeploy Failed"); } } } @@ -107,49 +104,71 @@ public class AutomationCompositionElementHandler extends AcElementListenerV1 { /** * Callback method to handle an update on an automation composition element. * - * @param automationCompositionId the ID of the automation composition - * @param element the information on the automation composition element - * @param properties properties Map + * @param compositionElement the information of the Automation Composition Definition Element + * @param instanceElement the information of the Automation Composition Instance Element + * @throws PfModelException if error occurs */ @Override - public void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties) + public void deploy(CompositionElementDto compositionElement, InstanceElementDto instanceElement) throws PfModelException { + Map<String, Object> properties = new HashMap<>(compositionElement.inProperties()); + properties.putAll(instanceElement.inProperties()); try { - var configurationEntity = CODER.convert(properties, ConfigurationEntity.class); - var violations = Validation.buildDefaultValidatorFactory().getValidator().validate(configurationEntity); - if (violations.isEmpty()) { - boolean isAllInferenceSvcDeployed = true; - var config = CODER.convert(properties, ThreadConfig.class); - for (KserveInferenceEntity kserveInferenceEntity : configurationEntity.getKserveInferenceEntities()) { - kserveClient.deployInferenceService(kserveInferenceEntity.getNamespace(), - kserveInferenceEntity.getPayload()); - - if (!checkInferenceServiceStatus(kserveInferenceEntity.getName(), - kserveInferenceEntity.getNamespace(), config.uninitializedToPassiveTimeout, - config.statusCheckInterval)) { - isAllInferenceSvcDeployed = false; - break; - } - } - if (isAllInferenceSvcDeployed) { - configRequestMap.put(element.getId(), configurationEntity); - intermediaryApi.updateAutomationCompositionElementState(automationCompositionId, element.getId(), - DeployState.DEPLOYED, null, StateChangeResult.NO_ERROR, "Deployed"); - } else { - LOGGER.error("Inference Service deployment failed"); + var configurationEntity = getConfigurationEntity(properties); + boolean isAllInferenceSvcDeployed = true; + var config = getThreadConfig(properties); + for (var kserveInferenceEntity : configurationEntity.getKserveInferenceEntities()) { + kserveClient.deployInferenceService(kserveInferenceEntity.getNamespace(), + kserveInferenceEntity.getPayload()); + + if (!checkInferenceServiceStatus(kserveInferenceEntity.getName(), + kserveInferenceEntity.getNamespace(), config.uninitializedToPassiveTimeout, + config.statusCheckInterval)) { + isAllInferenceSvcDeployed = false; + break; } + } + if (isAllInferenceSvcDeployed) { + intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(), + instanceElement.elementId(), DeployState.DEPLOYED, null, + StateChangeResult.NO_ERROR, "Deployed"); } else { - LOGGER.error("Violations found in the config request parameters: {}", violations); - throw new ValidationException("Constraint violations in the config request"); + LOGGER.error("Inference Service deployment failed"); + intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(), + instanceElement.elementId(), DeployState.UNDEPLOYED, null, + StateChangeResult.FAILED, "Deploy Failed"); } - } catch (CoderException e) { - throw new KserveException(HttpStatus.SC_BAD_REQUEST, "Invalid inference service configuration", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new KserveException("Interrupt in configuring the inference service", e); } catch (IOException | ExecutionException | ApiException e) { throw new KserveException("Failed to configure the inference service", e); } + + } + + private ConfigurationEntity getConfigurationEntity(Map<String, Object> properties) throws KserveException { + try { + var configurationEntity = CODER.convert(properties, ConfigurationEntity.class); + try (var validatorFactory = Validation.buildDefaultValidatorFactory()) { + var violations = validatorFactory.getValidator().validate(configurationEntity); + if (!violations.isEmpty()) { + LOGGER.error("Violations found in the config request parameters: {}", violations); + throw new ValidationException("Constraint violations in the config request"); + } + } + return configurationEntity; + } catch (CoderException e) { + throw new KserveException(HttpStatus.SC_BAD_REQUEST, "Invalid inference service configuration", e); + } + } + + private ThreadConfig getThreadConfig(Map<String, Object> properties) throws KserveException { + try { + return CODER.convert(properties, ThreadConfig.class); + } catch (CoderException e) { + throw new KserveException(HttpStatus.SC_BAD_REQUEST, "Invalid inference service configuration", e); + } } /** @@ -166,34 +185,8 @@ public class AutomationCompositionElementHandler extends AcElementListenerV1 { public boolean checkInferenceServiceStatus(String inferenceServiceName, String namespace, int timeout, int statusCheckInterval) throws ExecutionException, InterruptedException { // Invoke runnable thread to check pod status - Future<String> result = executor.submit(new InferenceServiceValidator(inferenceServiceName, namespace, timeout, + var result = executor.submit(new InferenceServiceValidator(inferenceServiceName, namespace, timeout, statusCheckInterval, kserveClient), "Done"); return (!result.get().isEmpty()) && result.isDone(); } - - @Override - public void handleRestartInstance(UUID automationCompositionId, AcElementDeploy element, - Map<String, Object> properties, DeployState deployState, LockState lockState) throws PfModelException { - if (DeployState.DEPLOYING.equals(deployState)) { - deploy(automationCompositionId, element, properties); - return; - } - if (DeployState.UNDEPLOYING.equals(deployState) || DeployState.DEPLOYED.equals(deployState) - || DeployState.UPDATING.equals(deployState)) { - try { - var configurationEntity = CODER.convert(properties, ConfigurationEntity.class); - configRequestMap.put(element.getId(), configurationEntity); - } catch (CoderException e) { - throw new KserveException(HttpStatus.SC_BAD_REQUEST, "Invalid inference service configuration", e); - } - } - if (DeployState.UNDEPLOYING.equals(deployState)) { - undeploy(automationCompositionId, element.getId()); - return; - } - deployState = AcmUtils.deployCompleted(deployState); - lockState = AcmUtils.lockCompleted(deployState, lockState); - intermediaryApi.updateAutomationCompositionElementState(automationCompositionId, element.getId(), deployState, - lockState, StateChangeResult.NO_ERROR, "Restarted"); - } } |