From 658e67bc821a3bc55f2c6d877e7e0baa21427333 Mon Sep 17 00:00:00 2001 From: "a.sreekumar" Date: Mon, 25 Jan 2021 11:03:42 +0000 Subject: Improve handling of multiple policy in APEX PDP Change-Id: Ic4adf5bd8876dc31fc93993298e90389baaa2c39 Issue-ID: POLICY-2883 Signed-off-by: a.sreekumar --- .../services/onappf/handler/ApexEngineHandler.java | 203 ++++++++++++++++++--- 1 file changed, 175 insertions(+), 28 deletions(-) (limited to 'services/services-onappf/src') diff --git a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java index 4d560a3bb..03e97215b 100644 --- a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java +++ b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019-2021 Nordix Foundation. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,13 +24,36 @@ package org.onap.policy.apex.services.onappf.handler; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.onap.policy.apex.core.engine.EngineParameters; +import org.onap.policy.apex.core.engine.TaskParameters; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInfo; +import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInformation; +import org.onap.policy.apex.model.basicmodel.service.ModelService; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchemas; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvents; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicies; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicy; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; +import org.onap.policy.apex.model.policymodel.concepts.AxTask; +import org.onap.policy.apex.model.policymodel.concepts.AxTasks; import org.onap.policy.apex.service.engine.main.ApexMain; +import org.onap.policy.apex.service.parameters.ApexParameterConstants; +import org.onap.policy.apex.service.parameters.ApexParameters; import org.onap.policy.apex.services.onappf.exception.ApexStarterException; +import org.onap.policy.common.parameters.ParameterService; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; @@ -49,7 +72,7 @@ public class ApexEngineHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ApexEngineHandler.class); - private ApexMain apexMain; + private Map apexMainMap; /** * Constructs the object. Extracts the config and model files from each policy and instantiates the apex engine. @@ -57,13 +80,13 @@ public class ApexEngineHandler { * @param policies the list of policies * @throws ApexStarterException if the apex engine instantiation failed using the policies passed */ - public ApexEngineHandler(List policies) throws ApexStarterException { - Map policyArgsMap = createPolicyArgsMap(policies); + public ApexEngineHandler(List policies) throws ApexStarterException { LOGGER.debug("Starting apex engine."); - try { - apexMain = new ApexMain(policyArgsMap); - } catch (ApexException e) { - throw new ApexStarterException(e); + apexMainMap = initiateApexEngineForPolicies(policies); + if (apexMainMap.isEmpty()) { + ModelService.clear(); + ParameterService.clear(); + throw new ApexStarterException("Apex Engine failed to start."); } } @@ -74,20 +97,136 @@ public class ApexEngineHandler { * @throws ApexStarterException if the apex engine instantiation failed using the policies passed */ public void updateApexEngine(List policies) throws ApexStarterException { - if (null == apexMain || !apexMain.isAlive()) { - throw new ApexStarterException("Apex Engine not initialized."); + List runningPolicies = getRunningPolicies(); + List policiesToDeploy = policies.stream() + .filter(policy -> !runningPolicies.contains(policy.getIdentifier())).collect(Collectors.toList()); + List policiesToUnDeploy = runningPolicies.stream() + .filter(polId -> policies.stream().noneMatch(policy -> policy.getIdentifier().equals(polId))) + .collect(Collectors.toList()); + Map undeployedPoliciesMainMap = new LinkedHashMap<>(); + policiesToUnDeploy.forEach(policyId -> { + ApexMain apexMain = apexMainMap.get(policyId); + try { + apexMain.shutdown(); + undeployedPoliciesMainMap.put(policyId, apexMain); + apexMainMap.remove(policyId); + } catch (ApexException e) { + LOGGER.error("Shutting down policy {} failed", policyId, e); + } + }); + if (!undeployedPoliciesMainMap.isEmpty()) { + updateModelAndParameterServices(undeployedPoliciesMainMap); } - Map policyArgsMap = createPolicyArgsMap(policies); - try { - apexMain.updateModel(policyArgsMap); - } catch (ApexException e) { - throw new ApexStarterException(e); + if (!policiesToDeploy.isEmpty()) { + Map mainMap = initiateApexEngineForPolicies(policiesToDeploy); + if (mainMap.isEmpty()) { + throw new ApexStarterException("Updating the APEX engine with new policies failed."); + } + apexMainMap.putAll(mainMap); + } + if (apexMainMap.isEmpty()) { + ModelService.clear(); + ParameterService.clear(); + } + } + + /** + * Clear the corresponding items from ModelService and ParameterService. + * + * @param undeployedPoliciesMainMap the policies that are undeployed + */ + private void updateModelAndParameterServices(Map undeployedPoliciesMainMap) { + Set inputParamKeysToRetain = new HashSet<>(); + Set outputParamKeysToRetain = new HashSet<>(); + List taskParametersToRetain = new ArrayList<>(); + List executorParamKeysToRetain = new ArrayList<>(); + List schemaParamKeysToRetain = new ArrayList<>(); + + List keyInfoKeystoRetain = new ArrayList<>(); + List schemaKeystoRetain = new ArrayList<>(); + List eventKeystoRetain = new ArrayList<>(); + List albumKeystoRetain = new ArrayList<>(); + List taskKeystoRetain = new ArrayList<>(); + List policyKeystoRetain = new ArrayList<>(); + + apexMainMap.values().forEach(main -> { + inputParamKeysToRetain.addAll(main.getApexParameters().getEventInputParameters().keySet()); + outputParamKeysToRetain.addAll(main.getApexParameters().getEventOutputParameters().keySet()); + taskParametersToRetain.addAll( + main.getApexParameters().getEngineServiceParameters().getEngineParameters().getTaskParameters()); + executorParamKeysToRetain.addAll(main.getApexParameters().getEngineServiceParameters().getEngineParameters() + .getExecutorParameterMap().keySet()); + schemaParamKeysToRetain.addAll(main.getApexParameters().getEngineServiceParameters().getEngineParameters() + .getContextParameters().getSchemaParameters().getSchemaHelperParameterMap().keySet()); + + keyInfoKeystoRetain + .addAll(main.getActivator().getPolicyModel().getKeyInformation().getKeyInfoMap().keySet()); + schemaKeystoRetain.addAll(main.getActivator().getPolicyModel().getSchemas().getSchemasMap().keySet()); + eventKeystoRetain.addAll(main.getActivator().getPolicyModel().getEvents().getEventMap().keySet()); + albumKeystoRetain.addAll(main.getActivator().getPolicyModel().getAlbums().getAlbumsMap().keySet()); + taskKeystoRetain.addAll(main.getActivator().getPolicyModel().getTasks().getTaskMap().keySet()); + policyKeystoRetain.addAll(main.getActivator().getPolicyModel().getPolicies().getPolicyMap().keySet()); + }); + for (ApexMain main : undeployedPoliciesMainMap.values()) { + ApexParameters existingParameters = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME); + List eventInputParamKeysToRemove = main.getApexParameters().getEventInputParameters().keySet() + .stream().filter(key -> !inputParamKeysToRetain.contains(key)).collect(Collectors.toList()); + List eventOutputParamKeysToRemove = main.getApexParameters().getEventOutputParameters().keySet() + .stream().filter(key -> !outputParamKeysToRetain.contains(key)).collect(Collectors.toList()); + eventInputParamKeysToRemove.forEach(existingParameters.getEventInputParameters()::remove); + eventOutputParamKeysToRemove.forEach(existingParameters.getEventOutputParameters()::remove); + EngineParameters engineParameters = + main.getApexParameters().getEngineServiceParameters().getEngineParameters(); + final List taskParametersToRemove = engineParameters.getTaskParameters().stream() + .filter(taskParameter -> !taskParametersToRetain.contains(taskParameter)).collect(Collectors.toList()); + final List executorParamKeysToRemove = engineParameters.getExecutorParameterMap().keySet().stream() + .filter(key -> !executorParamKeysToRetain.contains(key)).collect(Collectors.toList()); + final List schemaParamKeysToRemove = + engineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap().keySet() + .stream().filter(key -> !schemaParamKeysToRetain.contains(key)).collect(Collectors.toList()); + EngineParameters aggregatedEngineParameters = + existingParameters.getEngineServiceParameters().getEngineParameters(); + aggregatedEngineParameters.getTaskParameters().removeAll(taskParametersToRemove); + executorParamKeysToRemove.forEach(aggregatedEngineParameters.getExecutorParameterMap()::remove); + schemaParamKeysToRemove.forEach(aggregatedEngineParameters.getContextParameters().getSchemaParameters() + .getSchemaHelperParameterMap()::remove); + + final AxPolicyModel policyModel = main.getActivator().getPolicyModel(); + final List keyInfoKeystoRemove = policyModel.getKeyInformation().getKeyInfoMap().keySet() + .stream().filter(key -> !keyInfoKeystoRetain.contains(key)).collect(Collectors.toList()); + final List schemaKeystoRemove = policyModel.getSchemas().getSchemasMap().keySet().stream() + .filter(key -> !schemaKeystoRetain.contains(key)).collect(Collectors.toList()); + final List eventKeystoRemove = policyModel.getEvents().getEventMap().keySet().stream() + .filter(key -> !eventKeystoRetain.contains(key)).collect(Collectors.toList()); + final List albumKeystoRemove = policyModel.getAlbums().getAlbumsMap().keySet().stream() + .filter(key -> !albumKeystoRetain.contains(key)).collect(Collectors.toList()); + final List taskKeystoRemove = policyModel.getTasks().getTaskMap().keySet().stream() + .filter(key -> !taskKeystoRetain.contains(key)).collect(Collectors.toList()); + final List policyKeystoRemove = policyModel.getPolicies().getPolicyMap().keySet().stream() + .filter(key -> !policyKeystoRetain.contains(key)).collect(Collectors.toList()); + + final Map keyInfoMap = + ModelService.getModel(AxKeyInformation.class).getKeyInfoMap(); + final Map schemasMap = + ModelService.getModel(AxContextSchemas.class).getSchemasMap(); + final Map eventMap = ModelService.getModel(AxEvents.class).getEventMap(); + final Map albumsMap = + ModelService.getModel(AxContextAlbums.class).getAlbumsMap(); + final Map taskMap = ModelService.getModel(AxTasks.class).getTaskMap(); + final Map policyMap = ModelService.getModel(AxPolicies.class).getPolicyMap(); + + keyInfoKeystoRemove.forEach(keyInfoMap::remove); + schemaKeystoRemove.forEach(schemasMap::remove); + eventKeystoRemove.forEach(eventMap::remove); + albumKeystoRemove.forEach(albumsMap::remove); + taskKeystoRemove.forEach(taskMap::remove); + policyKeystoRemove.forEach(policyMap::remove); } } - private Map createPolicyArgsMap(List policies) + private Map initiateApexEngineForPolicies(List policies) throws ApexStarterException { - Map policyArgsMap = new LinkedHashMap<>(); + Map mainMap = new LinkedHashMap<>(); for (ToscaPolicy policy : policies) { String policyName = policy.getIdentifier().getName(); final StandardCoder standardCoder = new StandardCoder(); @@ -103,34 +242,38 @@ public class ApexEngineHandler { throw new ApexStarterException(e); } final String[] apexArgs = {"-p", file.getAbsolutePath()}; - policyArgsMap.put(policy.getIdentifier(), apexArgs); + LOGGER.info("Starting apex engine for policy {}", policy.getIdentifier()); + try { + ApexMain apexMain = new ApexMain(apexArgs); + mainMap.put(policy.getIdentifier(), apexMain); + } catch (Exception e) { + LOGGER.error("Execution of policy {} failed", policy.getIdentifier(), e); + } } - return policyArgsMap; + return mainMap; } /** * Method to get the APEX engine statistics. */ public List getEngineStats() { - List engineStats = null; - if (null != apexMain && apexMain.isAlive()) { - engineStats = apexMain.getEngineStats(); - } - return engineStats; + // engineStats from all the apexMain instances running individual tosca policies are combined here. + return apexMainMap.values().stream().filter(apexMain -> (null != apexMain && apexMain.isAlive())) + .flatMap(m -> m.getEngineStats().stream()).collect(Collectors.toList()); } /** * Method to check whether the apex engine is running or not. */ public boolean isApexEngineRunning() { - return null != apexMain && apexMain.isAlive(); + return apexMainMap.values().stream().anyMatch(apexMain -> (null != apexMain && apexMain.isAlive())); } /** * Method that return the list of running policies in the apex engine. */ public List getRunningPolicies() { - return new ArrayList<>(apexMain.getApexParametersMap().keySet()); + return new ArrayList<>(apexMainMap.keySet()); } /** @@ -139,8 +282,12 @@ public class ApexEngineHandler { public void shutdown() throws ApexStarterException { try { LOGGER.debug("Shutting down apex engine."); - apexMain.shutdown(); - apexMain = null; + for (ApexMain apexMain : apexMainMap.values()) { + apexMain.shutdown(); + } + apexMainMap.clear(); + ModelService.clear(); + ParameterService.clear(); } catch (final ApexException e) { throw new ApexStarterException(e); } -- cgit 1.2.3-korg