diff options
author | a.sreekumar <ajith.sreekumar@bell.ca> | 2021-01-25 11:03:42 +0000 |
---|---|---|
committer | a.sreekumar <ajith.sreekumar@bell.ca> | 2021-01-28 13:14:51 +0000 |
commit | 658e67bc821a3bc55f2c6d877e7e0baa21427333 (patch) | |
tree | 5ab53c6026317858aec8b61bf0aec37459247cf5 /services/services-onappf/src/main | |
parent | 84f92b44e70ce27bb4213da677d50ac91169432c (diff) |
Improve handling of multiple policy in APEX PDP
Change-Id: Ic4adf5bd8876dc31fc93993298e90389baaa2c39
Issue-ID: POLICY-2883
Signed-off-by: a.sreekumar <ajith.sreekumar@bell.ca>
Diffstat (limited to 'services/services-onappf/src/main')
-rw-r--r-- | services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java | 203 |
1 files changed, 175 insertions, 28 deletions
diff --git a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java index 4d560a3bb..03e97215b 100644 --- a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java +++ b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019-2021 Nordix Foundation. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,13 +24,36 @@ package org.onap.policy.apex.services.onappf.handler; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.onap.policy.apex.core.engine.EngineParameters; +import org.onap.policy.apex.core.engine.TaskParameters; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInfo; +import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInformation; +import org.onap.policy.apex.model.basicmodel.service.ModelService; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchemas; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvents; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicies; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicy; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; +import org.onap.policy.apex.model.policymodel.concepts.AxTask; +import org.onap.policy.apex.model.policymodel.concepts.AxTasks; import org.onap.policy.apex.service.engine.main.ApexMain; +import org.onap.policy.apex.service.parameters.ApexParameterConstants; +import org.onap.policy.apex.service.parameters.ApexParameters; import org.onap.policy.apex.services.onappf.exception.ApexStarterException; +import org.onap.policy.common.parameters.ParameterService; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; @@ -49,7 +72,7 @@ public class ApexEngineHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ApexEngineHandler.class); - private ApexMain apexMain; + private Map<ToscaConceptIdentifier, ApexMain> apexMainMap; /** * Constructs the object. Extracts the config and model files from each policy and instantiates the apex engine. @@ -57,13 +80,13 @@ public class ApexEngineHandler { * @param policies the list of policies * @throws ApexStarterException if the apex engine instantiation failed using the policies passed */ - public ApexEngineHandler(List<ToscaPolicy> policies) throws ApexStarterException { - Map<ToscaConceptIdentifier, String[]> policyArgsMap = createPolicyArgsMap(policies); + public ApexEngineHandler(List<ToscaPolicy> policies) throws ApexStarterException { LOGGER.debug("Starting apex engine."); - try { - apexMain = new ApexMain(policyArgsMap); - } catch (ApexException e) { - throw new ApexStarterException(e); + apexMainMap = initiateApexEngineForPolicies(policies); + if (apexMainMap.isEmpty()) { + ModelService.clear(); + ParameterService.clear(); + throw new ApexStarterException("Apex Engine failed to start."); } } @@ -74,20 +97,136 @@ public class ApexEngineHandler { * @throws ApexStarterException if the apex engine instantiation failed using the policies passed */ public void updateApexEngine(List<ToscaPolicy> policies) throws ApexStarterException { - if (null == apexMain || !apexMain.isAlive()) { - throw new ApexStarterException("Apex Engine not initialized."); + List<ToscaConceptIdentifier> runningPolicies = getRunningPolicies(); + List<ToscaPolicy> policiesToDeploy = policies.stream() + .filter(policy -> !runningPolicies.contains(policy.getIdentifier())).collect(Collectors.toList()); + List<ToscaConceptIdentifier> policiesToUnDeploy = runningPolicies.stream() + .filter(polId -> policies.stream().noneMatch(policy -> policy.getIdentifier().equals(polId))) + .collect(Collectors.toList()); + Map<ToscaConceptIdentifier, ApexMain> undeployedPoliciesMainMap = new LinkedHashMap<>(); + policiesToUnDeploy.forEach(policyId -> { + ApexMain apexMain = apexMainMap.get(policyId); + try { + apexMain.shutdown(); + undeployedPoliciesMainMap.put(policyId, apexMain); + apexMainMap.remove(policyId); + } catch (ApexException e) { + LOGGER.error("Shutting down policy {} failed", policyId, e); + } + }); + if (!undeployedPoliciesMainMap.isEmpty()) { + updateModelAndParameterServices(undeployedPoliciesMainMap); } - Map<ToscaConceptIdentifier, String[]> policyArgsMap = createPolicyArgsMap(policies); - try { - apexMain.updateModel(policyArgsMap); - } catch (ApexException e) { - throw new ApexStarterException(e); + if (!policiesToDeploy.isEmpty()) { + Map<ToscaConceptIdentifier, ApexMain> mainMap = initiateApexEngineForPolicies(policiesToDeploy); + if (mainMap.isEmpty()) { + throw new ApexStarterException("Updating the APEX engine with new policies failed."); + } + apexMainMap.putAll(mainMap); + } + if (apexMainMap.isEmpty()) { + ModelService.clear(); + ParameterService.clear(); + } + } + + /** + * Clear the corresponding items from ModelService and ParameterService. + * + * @param undeployedPoliciesMainMap the policies that are undeployed + */ + private void updateModelAndParameterServices(Map<ToscaConceptIdentifier, ApexMain> undeployedPoliciesMainMap) { + Set<String> inputParamKeysToRetain = new HashSet<>(); + Set<String> outputParamKeysToRetain = new HashSet<>(); + List<TaskParameters> taskParametersToRetain = new ArrayList<>(); + List<String> executorParamKeysToRetain = new ArrayList<>(); + List<String> schemaParamKeysToRetain = new ArrayList<>(); + + List<AxArtifactKey> keyInfoKeystoRetain = new ArrayList<>(); + List<AxArtifactKey> schemaKeystoRetain = new ArrayList<>(); + List<AxArtifactKey> eventKeystoRetain = new ArrayList<>(); + List<AxArtifactKey> albumKeystoRetain = new ArrayList<>(); + List<AxArtifactKey> taskKeystoRetain = new ArrayList<>(); + List<AxArtifactKey> policyKeystoRetain = new ArrayList<>(); + + apexMainMap.values().forEach(main -> { + inputParamKeysToRetain.addAll(main.getApexParameters().getEventInputParameters().keySet()); + outputParamKeysToRetain.addAll(main.getApexParameters().getEventOutputParameters().keySet()); + taskParametersToRetain.addAll( + main.getApexParameters().getEngineServiceParameters().getEngineParameters().getTaskParameters()); + executorParamKeysToRetain.addAll(main.getApexParameters().getEngineServiceParameters().getEngineParameters() + .getExecutorParameterMap().keySet()); + schemaParamKeysToRetain.addAll(main.getApexParameters().getEngineServiceParameters().getEngineParameters() + .getContextParameters().getSchemaParameters().getSchemaHelperParameterMap().keySet()); + + keyInfoKeystoRetain + .addAll(main.getActivator().getPolicyModel().getKeyInformation().getKeyInfoMap().keySet()); + schemaKeystoRetain.addAll(main.getActivator().getPolicyModel().getSchemas().getSchemasMap().keySet()); + eventKeystoRetain.addAll(main.getActivator().getPolicyModel().getEvents().getEventMap().keySet()); + albumKeystoRetain.addAll(main.getActivator().getPolicyModel().getAlbums().getAlbumsMap().keySet()); + taskKeystoRetain.addAll(main.getActivator().getPolicyModel().getTasks().getTaskMap().keySet()); + policyKeystoRetain.addAll(main.getActivator().getPolicyModel().getPolicies().getPolicyMap().keySet()); + }); + for (ApexMain main : undeployedPoliciesMainMap.values()) { + ApexParameters existingParameters = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME); + List<String> eventInputParamKeysToRemove = main.getApexParameters().getEventInputParameters().keySet() + .stream().filter(key -> !inputParamKeysToRetain.contains(key)).collect(Collectors.toList()); + List<String> eventOutputParamKeysToRemove = main.getApexParameters().getEventOutputParameters().keySet() + .stream().filter(key -> !outputParamKeysToRetain.contains(key)).collect(Collectors.toList()); + eventInputParamKeysToRemove.forEach(existingParameters.getEventInputParameters()::remove); + eventOutputParamKeysToRemove.forEach(existingParameters.getEventOutputParameters()::remove); + EngineParameters engineParameters = + main.getApexParameters().getEngineServiceParameters().getEngineParameters(); + final List<TaskParameters> taskParametersToRemove = engineParameters.getTaskParameters().stream() + .filter(taskParameter -> !taskParametersToRetain.contains(taskParameter)).collect(Collectors.toList()); + final List<String> executorParamKeysToRemove = engineParameters.getExecutorParameterMap().keySet().stream() + .filter(key -> !executorParamKeysToRetain.contains(key)).collect(Collectors.toList()); + final List<String> schemaParamKeysToRemove = + engineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap().keySet() + .stream().filter(key -> !schemaParamKeysToRetain.contains(key)).collect(Collectors.toList()); + EngineParameters aggregatedEngineParameters = + existingParameters.getEngineServiceParameters().getEngineParameters(); + aggregatedEngineParameters.getTaskParameters().removeAll(taskParametersToRemove); + executorParamKeysToRemove.forEach(aggregatedEngineParameters.getExecutorParameterMap()::remove); + schemaParamKeysToRemove.forEach(aggregatedEngineParameters.getContextParameters().getSchemaParameters() + .getSchemaHelperParameterMap()::remove); + + final AxPolicyModel policyModel = main.getActivator().getPolicyModel(); + final List<AxArtifactKey> keyInfoKeystoRemove = policyModel.getKeyInformation().getKeyInfoMap().keySet() + .stream().filter(key -> !keyInfoKeystoRetain.contains(key)).collect(Collectors.toList()); + final List<AxArtifactKey> schemaKeystoRemove = policyModel.getSchemas().getSchemasMap().keySet().stream() + .filter(key -> !schemaKeystoRetain.contains(key)).collect(Collectors.toList()); + final List<AxArtifactKey> eventKeystoRemove = policyModel.getEvents().getEventMap().keySet().stream() + .filter(key -> !eventKeystoRetain.contains(key)).collect(Collectors.toList()); + final List<AxArtifactKey> albumKeystoRemove = policyModel.getAlbums().getAlbumsMap().keySet().stream() + .filter(key -> !albumKeystoRetain.contains(key)).collect(Collectors.toList()); + final List<AxArtifactKey> taskKeystoRemove = policyModel.getTasks().getTaskMap().keySet().stream() + .filter(key -> !taskKeystoRetain.contains(key)).collect(Collectors.toList()); + final List<AxArtifactKey> policyKeystoRemove = policyModel.getPolicies().getPolicyMap().keySet().stream() + .filter(key -> !policyKeystoRetain.contains(key)).collect(Collectors.toList()); + + final Map<AxArtifactKey, AxKeyInfo> keyInfoMap = + ModelService.getModel(AxKeyInformation.class).getKeyInfoMap(); + final Map<AxArtifactKey, AxContextSchema> schemasMap = + ModelService.getModel(AxContextSchemas.class).getSchemasMap(); + final Map<AxArtifactKey, AxEvent> eventMap = ModelService.getModel(AxEvents.class).getEventMap(); + final Map<AxArtifactKey, AxContextAlbum> albumsMap = + ModelService.getModel(AxContextAlbums.class).getAlbumsMap(); + final Map<AxArtifactKey, AxTask> taskMap = ModelService.getModel(AxTasks.class).getTaskMap(); + final Map<AxArtifactKey, AxPolicy> policyMap = ModelService.getModel(AxPolicies.class).getPolicyMap(); + + keyInfoKeystoRemove.forEach(keyInfoMap::remove); + schemaKeystoRemove.forEach(schemasMap::remove); + eventKeystoRemove.forEach(eventMap::remove); + albumKeystoRemove.forEach(albumsMap::remove); + taskKeystoRemove.forEach(taskMap::remove); + policyKeystoRemove.forEach(policyMap::remove); } } - private Map<ToscaConceptIdentifier, String[]> createPolicyArgsMap(List<ToscaPolicy> policies) + private Map<ToscaConceptIdentifier, ApexMain> initiateApexEngineForPolicies(List<ToscaPolicy> policies) throws ApexStarterException { - Map<ToscaConceptIdentifier, String[]> policyArgsMap = new LinkedHashMap<>(); + Map<ToscaConceptIdentifier, ApexMain> mainMap = new LinkedHashMap<>(); for (ToscaPolicy policy : policies) { String policyName = policy.getIdentifier().getName(); final StandardCoder standardCoder = new StandardCoder(); @@ -103,34 +242,38 @@ public class ApexEngineHandler { throw new ApexStarterException(e); } final String[] apexArgs = {"-p", file.getAbsolutePath()}; - policyArgsMap.put(policy.getIdentifier(), apexArgs); + LOGGER.info("Starting apex engine for policy {}", policy.getIdentifier()); + try { + ApexMain apexMain = new ApexMain(apexArgs); + mainMap.put(policy.getIdentifier(), apexMain); + } catch (Exception e) { + LOGGER.error("Execution of policy {} failed", policy.getIdentifier(), e); + } } - return policyArgsMap; + return mainMap; } /** * Method to get the APEX engine statistics. */ public List<AxEngineModel> getEngineStats() { - List<AxEngineModel> engineStats = null; - if (null != apexMain && apexMain.isAlive()) { - engineStats = apexMain.getEngineStats(); - } - return engineStats; + // engineStats from all the apexMain instances running individual tosca policies are combined here. + return apexMainMap.values().stream().filter(apexMain -> (null != apexMain && apexMain.isAlive())) + .flatMap(m -> m.getEngineStats().stream()).collect(Collectors.toList()); } /** * Method to check whether the apex engine is running or not. */ public boolean isApexEngineRunning() { - return null != apexMain && apexMain.isAlive(); + return apexMainMap.values().stream().anyMatch(apexMain -> (null != apexMain && apexMain.isAlive())); } /** * Method that return the list of running policies in the apex engine. */ public List<ToscaConceptIdentifier> getRunningPolicies() { - return new ArrayList<>(apexMain.getApexParametersMap().keySet()); + return new ArrayList<>(apexMainMap.keySet()); } /** @@ -139,8 +282,12 @@ public class ApexEngineHandler { public void shutdown() throws ApexStarterException { try { LOGGER.debug("Shutting down apex engine."); - apexMain.shutdown(); - apexMain = null; + for (ApexMain apexMain : apexMainMap.values()) { + apexMain.shutdown(); + } + apexMainMap.clear(); + ModelService.clear(); + ParameterService.clear(); } catch (final ApexException e) { throw new ApexStarterException(e); } |