diff options
author | a.sreekumar <ajith.sreekumar@bell.ca> | 2021-01-25 11:03:42 +0000 |
---|---|---|
committer | a.sreekumar <ajith.sreekumar@bell.ca> | 2021-01-28 13:14:51 +0000 |
commit | 658e67bc821a3bc55f2c6d877e7e0baa21427333 (patch) | |
tree | 5ab53c6026317858aec8b61bf0aec37459247cf5 /services | |
parent | 84f92b44e70ce27bb4213da677d50ac91169432c (diff) |
Improve handling of multiple policy in APEX PDP
Change-Id: Ic4adf5bd8876dc31fc93993298e90389baaa2c39
Issue-ID: POLICY-2883
Signed-off-by: a.sreekumar <ajith.sreekumar@bell.ca>
Diffstat (limited to 'services')
8 files changed, 907 insertions, 357 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java index cadcc7ee8..3451c120c 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019-2021 Nordix Foundation. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,35 +22,41 @@ package org.onap.policy.apex.service.engine.main; -import java.util.AbstractMap; -import java.util.ArrayList; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; import lombok.Getter; import lombok.Setter; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInfo; +import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInformation; import org.onap.policy.apex.model.basicmodel.handling.ApexModelException; import org.onap.policy.apex.model.basicmodel.service.ModelService; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchemas; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvents; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicies; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicy; import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; +import org.onap.policy.apex.model.policymodel.concepts.AxTask; +import org.onap.policy.apex.model.policymodel.concepts.AxTasks; import org.onap.policy.apex.model.policymodel.handling.PolicyModelMerger; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.runtime.EngineService; import org.onap.policy.apex.service.engine.runtime.impl.EngineServiceImpl; +import org.onap.policy.apex.service.parameters.ApexParameterConstants; import org.onap.policy.apex.service.parameters.ApexParameters; import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; import org.onap.policy.common.parameters.ParameterService; -import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; @@ -66,13 +72,12 @@ public class ApexActivator { // The logger for this class private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexActivator.class); - // The parameters of the Apex activator when running with multiple policies @Getter @Setter - private Map<ToscaConceptIdentifier, ApexParameters> apexParametersMap; + private ApexParameters apexParameters; @Getter - Map<ToscaConceptIdentifier, AxPolicyModel> policyModelsMap; + private AxPolicyModel policyModel; // Event unmarshalers are used to receive events asynchronously into Apex private final Map<String, ApexEventUnmarshaller> unmarshallerMap = new LinkedHashMap<>(); @@ -93,10 +98,10 @@ public class ApexActivator { /** * Instantiate the activator for the Apex engine as a complete service. * - * @param parametersMap the apex parameters map for the Apex service + * @param parameters the apex parameters for the Apex service */ - public ApexActivator(Map<ToscaConceptIdentifier, ApexParameters> parametersMap) { - apexParametersMap = parametersMap; + public ApexActivator(ApexParameters parameters) { + apexParameters = parameters; } /** @@ -108,96 +113,131 @@ public class ApexActivator { LOGGER.debug("Apex engine starting as a service . . ."); try { - ApexParameters apexParameters = apexParametersMap.values().iterator().next(); - // totalInstanceCount is the sum of instance counts required as per each policy - int totalInstanceCount = apexParametersMap.values().stream() - .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum(); - apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount); - engineKey = apexParameters.getEngineServiceParameters().getEngineKey(); instantiateEngine(apexParameters); setUpModelMarshallerAndUnmarshaller(apexParameters); } catch (final Exception e) { - LOGGER.debug(APEX_ENGINE_FAILED_MSG, e); + try { + terminate(); + } catch (ApexException e1) { + LOGGER.error("Terminating the ApexActivator encountered error", e1); + } throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e); } LOGGER.debug("Apex engine started as a service"); } - private void setUpModelMarshallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException { - policyModelsMap = new LinkedHashMap<>(); - Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>(); - Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>(); - Set<Entry<ToscaConceptIdentifier, ApexParameters>> apexParamsEntrySet = new LinkedHashSet<>( - apexParametersMap.entrySet()); - apexParamsEntrySet.stream().forEach(apexParamsEntry -> { - ApexParameters apexParams = apexParamsEntry.getValue(); - List<String> duplicateInputParameters = new ArrayList<>(apexParams.getEventInputParameters().keySet()); - duplicateInputParameters.retainAll(inputParametersMap.keySet()); - List<String> duplicateOutputParameters = new ArrayList<>(apexParams.getEventOutputParameters().keySet()); - duplicateOutputParameters.retainAll(outputParametersMap.keySet()); - - if (!(duplicateInputParameters.isEmpty() && duplicateOutputParameters.isEmpty())) { - LOGGER.error("I/O Parameters {}/{} for {}:{} are duplicates. So this policy is not executed.", - duplicateInputParameters, duplicateOutputParameters, apexParamsEntry.getKey().getName(), - apexParamsEntry.getKey().getVersion()); - apexParametersMap.remove(apexParamsEntry.getKey()); - return; - } - inputParametersMap.putAll(apexParams.getEventInputParameters()); - outputParametersMap.putAll(apexParams.getEventOutputParameters()); - // Check if a policy model file has been specified - if (apexParams.getEngineServiceParameters().getPolicyModel() != null) { - LOGGER.debug("deploying policy model to the apex engines . . ."); - try { - final String policyModelString = apexParams.getEngineServiceParameters().getPolicyModel(); - AxPolicyModel policyModel = EngineServiceImpl.createModel(engineKey, policyModelString); - policyModelsMap.put(apexParamsEntry.getKey(), policyModel); - } catch (ApexException e) { - throw new ApexRuntimeException("Failed to create the apex model.", e); - } - } - }); - AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap); + private void instantiateEngine(ApexParameters apexParameters) throws ApexException { + if (null != apexEngineService && apexEngineService.getKey().equals(engineKey)) { + throw new ApexException("Apex Engine already initialized."); + } + // Create engine with specified thread count + LOGGER.debug("starting apex engine service . . ."); + apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters()); + + // Create the engine holder to hold the engine's references and act as an event + // receiver + engineServiceHandler = new ApexEngineServiceHandler(apexEngineService); + } + + private void setUpModelMarshallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException { + AxPolicyModel model; + try { + final String policyModelString = apexParameters.getEngineServiceParameters().getPolicyModel(); + model = EngineServiceImpl.createModel(apexParameters.getEngineServiceParameters().getEngineKey(), + policyModelString); + } catch (ApexException e) { + throw new ApexRuntimeException("Failed to create the apex model.", e); + } + AxKeyInformation existingKeyInformation = null; + AxContextSchemas existingSchemas = null; + AxEvents existingEvents = null; + AxContextAlbums existingAlbums = null; + AxTasks existingTasks = null; + AxPolicies existingPolicies = null; + if (ModelService.existsModel(AxPolicyModel.class)) { + existingKeyInformation = new AxKeyInformation(ModelService.getModel(AxKeyInformation.class)); + existingSchemas = new AxContextSchemas(ModelService.getModel(AxContextSchemas.class)); + existingEvents = new AxEvents(ModelService.getModel(AxEvents.class)); + existingAlbums = new AxContextAlbums(ModelService.getModel(AxContextAlbums.class)); + existingTasks = new AxTasks(ModelService.getModel(AxTasks.class)); + existingPolicies = new AxPolicies(ModelService.getModel(AxPolicies.class)); + } // Set the policy model in the engine - apexEngineService.updateModel(engineKey, finalPolicyModel, true); + try { + apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), model, true); + policyModel = new AxPolicyModel(model); + } catch (ApexException exp) { + // Discard concepts from the current policy + if (null != existingKeyInformation) { + // Update model service with other policies' concepts. + ModelService.registerModel(AxKeyInformation.class, existingKeyInformation); + ModelService.registerModel(AxContextSchemas.class, existingSchemas); + ModelService.registerModel(AxEvents.class, existingEvents); + ModelService.registerModel(AxContextAlbums.class, existingAlbums); + ModelService.registerModel(AxTasks.class, existingTasks); + ModelService.registerModel(AxPolicies.class, existingPolicies); + } else { + ModelService.clear(); + } + throw exp; + } + if (null != existingKeyInformation) { + // Make sure all concepts in previously deployed policies are retained in ModelService + // during multi policy deployment. + updateModelService(existingKeyInformation, existingSchemas, existingEvents, existingAlbums, existingTasks, + existingPolicies); + } - handleExistingMarshallerAndUnmarshaller(inputParametersMap, outputParametersMap); - setUpNewMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap, - outputParametersMap); + setUpNewMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), + apexParameters.getEventInputParameters(), apexParameters.getEventOutputParameters()); // Wire up pairings between marhsallers and unmarshallers - setUpMarshalerPairings(inputParametersMap); + setUpMarshalerPairings(apexParameters.getEventInputParameters()); // Start event processing - startUnmarshallers(inputParametersMap); + startUnmarshallers(apexParameters.getEventInputParameters()); } - private AxPolicyModel aggregatePolicyModels(Map<ToscaConceptIdentifier, AxPolicyModel> policyModelsMap) { - // Doing a deep copy so that original values in policyModelsMap is retained - // after reduction operation - Set<Entry<ToscaConceptIdentifier, AxPolicyModel>> policyModelsEntries = policyModelsMap.entrySet().stream() - .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue())).collect(Collectors.toSet()); - Optional<Entry<ToscaConceptIdentifier, AxPolicyModel>> finalPolicyModelEntry = policyModelsEntries.stream() - .reduce((entry1, entry2) -> { - try { - entry1.setValue( - PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true)); - } catch (ApexModelException exc) { - LOGGER.error("Policy model for {} : {} has duplicates. So this policy is not executed.", - entry2.getKey().getName(), entry2.getKey().getVersion(), exc); - apexParametersMap.remove(entry2.getKey()); - policyModelsMap.remove(entry2.getKey()); - } - return entry1; - }); - AxPolicyModel finalPolicyModel = null; - if (finalPolicyModelEntry.isPresent()) { - finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.get().getValue()); + private void updateModelService(AxKeyInformation existingKeyInformation, + AxContextSchemas existingSchemas, AxEvents existingEvents, + AxContextAlbums existingAlbums, AxTasks existingTasks, + AxPolicies existingPolicies) throws ApexModelException { + + AxContextSchemas axContextSchemas = ModelService.getModel(AxContextSchemas.class); + AxEvents axEvents = ModelService.getModel(AxEvents.class); + AxContextAlbums axContextAlbums = ModelService.getModel(AxContextAlbums.class); + AxTasks axTasks = ModelService.getModel(AxTasks.class); + AxPolicies axPolicies = ModelService.getModel(AxPolicies.class); + + Map<AxArtifactKey, AxContextSchema> newSchemasMap = axContextSchemas.getSchemasMap(); + Map<AxArtifactKey, AxEvent> newEventsMap = axEvents.getEventMap(); + Map<AxArtifactKey, AxContextAlbum> newAlbumsMap = axContextAlbums.getAlbumsMap(); + Map<AxArtifactKey, AxTask> newTasksMap = axTasks.getTaskMap(); + Map<AxArtifactKey, AxPolicy> newPoliciesMap = axPolicies.getPolicyMap(); + + StringBuilder errorMessage = new StringBuilder(); + PolicyModelMerger.checkForDuplicateItem(existingSchemas.getSchemasMap(), newSchemasMap, errorMessage, "schema"); + PolicyModelMerger.checkForDuplicateItem(existingEvents.getEventMap(), newEventsMap, errorMessage, "event"); + PolicyModelMerger.checkForDuplicateItem(existingAlbums.getAlbumsMap(), newAlbumsMap, errorMessage, "album"); + PolicyModelMerger.checkForDuplicateItem(existingTasks.getTaskMap(), newTasksMap, errorMessage, "task"); + PolicyModelMerger.checkForDuplicateItem(existingPolicies.getPolicyMap(), newPoliciesMap, errorMessage, + "policy"); + if (errorMessage.length() > 0) { + throw new ApexModelException(errorMessage.toString()); } - return finalPolicyModel; + + AxKeyInformation axKeyInformation = ModelService.getModel(AxKeyInformation.class); + Map<AxArtifactKey, AxKeyInfo> newKeyInfoMap = axKeyInformation.getKeyInfoMap(); + // Now add all the concepts that must be copied over + newKeyInfoMap.putAll(existingKeyInformation.getKeyInfoMap()); + newSchemasMap.putAll(existingSchemas.getSchemasMap()); + newEventsMap.putAll(existingEvents.getEventMap()); + newAlbumsMap.putAll(existingAlbums.getAlbumsMap()); + newTasksMap.putAll(existingTasks.getTaskMap()); + newPoliciesMap.putAll(existingPolicies.getPolicyMap()); } private void setUpNewMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters, @@ -226,38 +266,6 @@ public class ApexActivator { } } - private void handleExistingMarshallerAndUnmarshaller(Map<String, EventHandlerParameters> inputParametersMap, - Map<String, EventHandlerParameters> outputParametersMap) { - // stop and remove any marshaller/unmarshaller that is part of a policy that is - // undeployed - marshallerMap.entrySet().stream() - .filter(marshallerEntry -> !outputParametersMap.containsKey(marshallerEntry.getKey())) - .forEach(marshallerEntry -> marshallerEntry.getValue().stop()); - marshallerMap.keySet().removeIf(marshallerKey -> !outputParametersMap.containsKey(marshallerKey)); - unmarshallerMap.entrySet().stream() - .filter(unmarshallerEntry -> !inputParametersMap.containsKey(unmarshallerEntry.getKey())) - .forEach(unmarshallerEntry -> unmarshallerEntry.getValue().stop()); - unmarshallerMap.keySet().removeIf(unmarshallerKey -> !inputParametersMap.containsKey(unmarshallerKey)); - - // If a marshaller/unmarshaller is already initialized, they don't need to be - // reinitialized during model update. - outputParametersMap.keySet().removeIf(marshallerMap::containsKey); - inputParametersMap.keySet().removeIf(unmarshallerMap::containsKey); - } - - private void instantiateEngine(ApexParameters apexParameters) throws ApexException { - if (null != apexEngineService && apexEngineService.getKey().equals(engineKey)) { - throw new ApexException("Apex Engine already initialized."); - } - // Create engine with specified thread count - LOGGER.debug("starting apex engine service . . ."); - apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters()); - - // Create the engine holder to hold the engine's references and act as an event - // receiver - engineServiceHandler = new ApexEngineServiceHandler(apexEngineService); - } - /** * Set up unmarshaler/marshaler pairing for synchronized event handling. We only * need to traverse the unmarshalers because the unmarshalers and marshalers are @@ -275,8 +283,8 @@ public class ApexActivator { // Check if the unmarshaler is synchronized with a marshaler if (inputParameters.getValue().isPeeredMode(peeredMode)) { // Find the unmarshaler and marshaler - final ApexEventMarshaller peeredMarshaler = marshallerMap - .get(inputParameters.getValue().getPeer(peeredMode)); + final ApexEventMarshaller peeredMarshaler = + marshallerMap.get(inputParameters.getValue().getPeer(peeredMode)); // Connect the unmarshaler and marshaler unmarshaller.connectMarshaler(peeredMode, peeredMarshaler); @@ -298,22 +306,6 @@ public class ApexActivator { } /** - * Updates the APEX Engine with the model created from new Policies. - * - * @param apexParamsMap the apex parameters map for the Apex service - * @throws ApexException on errors - */ - public void updateModel(Map<ToscaConceptIdentifier, ApexParameters> apexParamsMap) throws ApexException { - try { - ApexParameters apexParameters = apexParamsMap.values().iterator().next(); - setUpModelMarshallerAndUnmarshaller(apexParameters); - } catch (final Exception e) { - LOGGER.debug(APEX_ENGINE_FAILED_MSG, e); - throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e); - } - } - - /** * Get the Apex engine worker stats. */ public List<AxEngineModel> getEngineStats() { @@ -338,19 +330,25 @@ public class ApexActivator { engineServiceHandler.terminate(); engineServiceHandler = null; } - - // Clear the services - ModelService.clear(); - ParameterService.clear(); + // Clear the services in case if this was the only policy running in the engine + ApexParameters apexParams = null; + if (ParameterService.contains(ApexParameterConstants.MAIN_GROUP_NAME)) { + apexParams = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME); + } + if (null != apexParams + && apexParameters.getEventInputParameters().size() == apexParams.getEventInputParameters().size()) { + ModelService.clear(); + ParameterService.clear(); + } } /** * Shuts down all marshallers and unmarshallers. */ private void shutdownMarshallerAndUnmarshaller() { - marshallerMap.values().forEach(ApexEventMarshaller::stop); - marshallerMap.clear(); unmarshallerMap.values().forEach(ApexEventUnmarshaller::stop); unmarshallerMap.clear(); + marshallerMap.values().forEach(ApexEventMarshaller::stop); + marshallerMap.clear(); } } diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java index fd6ac4489..fdc404c67 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019-2020 Nordix Foundation. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -347,17 +347,16 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable { // Order the stop stopOrderedFlag = true; - // Order a stop on the synchronous cache if one exists - if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS) - && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) { - ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop(); - } - // Wait for thread shutdown while (unmarshallerThread != null && unmarshallerThread.isAlive()) { ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL); } + // Order a stop on the synchronous cache if one exists + if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS) + && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) { + ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop(); + } LOGGER.exit("shut down Apex event unmarshaller"); } } diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java index 5707c95ad..65c2acffa 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modification Copyright (C) 2019-2021 Nordix Foundation. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,26 +24,20 @@ package org.onap.policy.apex.service.engine.main; import java.util.Arrays; import java.util.Base64; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Map.Entry; -import java.util.TreeMap; +import java.util.stream.Collectors; import lombok.Getter; import lombok.Setter; import org.onap.policy.apex.core.engine.EngineParameters; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; -import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; -import org.onap.policy.apex.model.basicmodel.service.ModelService; -import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum; -import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; -import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; +import org.onap.policy.apex.service.parameters.ApexParameterConstants; import org.onap.policy.apex.service.parameters.ApexParameterHandler; import org.onap.policy.apex.service.parameters.ApexParameters; +import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; import org.onap.policy.common.parameters.ParameterService; -import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; @@ -58,14 +52,12 @@ public class ApexMain { private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexMain.class); // The Apex Activator that activates the Apex engine + @Getter private ApexActivator activator; // The parameters read in from JSON for each policy @Getter - private Map<ToscaConceptIdentifier, ApexParameters> apexParametersMap; - - //engineParameters are aggregated in case of multiple policies - private EngineParameters aggregatedEngineParameters; + private ApexParameters apexParameters; private ApexParameterHandler apexParameterHandler = new ApexParameterHandler(); @@ -77,73 +69,27 @@ public class ApexMain { * Instantiates the Apex service. * * @param args the command line arguments + * @throws ApexException the apex exception. */ - public ApexMain(final String[] args) { + public ApexMain(final String[] args) throws ApexException { LOGGER.entry("Starting Apex service with parameters " + Arrays.toString(args) + " . . ."); - apexParametersMap = new LinkedHashMap<>(); - aggregatedEngineParameters = new EngineParameters(); try { - apexParametersMap.put(new ToscaConceptIdentifier(), populateApexParameters(args)); + apexParameters = populateApexParameters(args); } catch (ApexException e) { LOGGER.error(APEX_SERVICE_FAILED_MSG, e); return; } - apexParameterHandler.registerParameters(apexParametersMap.values().iterator().next()); - // Now, create the activator for the Apex service - activator = new ApexActivator(apexParametersMap); - - // Start the activator - try { - activator.initialize(); - setAlive(true); - } catch (final ApexActivatorException e) { - LOGGER.error("start of Apex service failed, used parameters are " + Arrays.toString(args), e); - return; - } - - // Add a shutdown hook to shut everything down in an orderly manner - Runtime.getRuntime().addShutdownHook(new ApexMainShutdownHookClass()); - LOGGER.exit("Started Apex"); - } - - /** - * Instantiates the Apex service for multiple policies. - * - * @param policyArgumentsMap the map with command line arguments as value and policy-id as key - * @throws ApexException on errors - */ - public ApexMain(Map<ToscaConceptIdentifier, String[]> policyArgumentsMap) throws ApexException { - apexParametersMap = new LinkedHashMap<>(); - aggregatedEngineParameters = new EngineParameters(); - for (Entry<ToscaConceptIdentifier, String[]> policyArgsEntry: policyArgumentsMap.entrySet()) { - try { - apexParametersMap.put(policyArgsEntry.getKey(), populateApexParameters(policyArgsEntry.getValue())); - } catch (ApexException e) { - LOGGER.error("Invalid arguments specified for policy - " + policyArgsEntry.getKey().getName() + ":" - + policyArgsEntry.getKey().getVersion(), e); - } - } - if (apexParametersMap.isEmpty()) { - LOGGER.error(APEX_SERVICE_FAILED_MSG); - return; - } + aggregateParametersAndRegister(); - // Set the aggregated engineParameters which will be used later while creating the engine - ApexParameters apexParameters = apexParametersMap.values().iterator().next(); - apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters); - apexParameterHandler.registerParameters(apexParameters); // Now, create the activator for the Apex service - activator = new ApexActivator(apexParametersMap); + activator = new ApexActivator(apexParameters); // Start the activator try { activator.initialize(); - apexParametersMap = activator.getApexParametersMap(); setAlive(true); } catch (final ApexActivatorException e) { - LOGGER.error(APEX_SERVICE_FAILED_MSG, e); - activator.terminate(); - return; + throw new ApexException("start of Apex service failed, used parameters are " + Arrays.toString(args), e); } // Add a shutdown hook to shut everything down in an orderly manner @@ -151,75 +97,6 @@ public class ApexMain { LOGGER.exit("Started Apex"); } - /** - * Updates the APEX Engine with the model created from new Policies. - * - * @param policyArgsMap the map with command line arguments as value and policy-id as key - * @throws ApexException on errors - */ - public void updateModel(Map<ToscaConceptIdentifier, String[]> policyArgsMap) throws ApexException { - // flag that determines if any policy received in PDP_UPDATE is already deployed in the engine - boolean isAnyPolicyDeployed = policyArgsMap.keySet().stream().anyMatch(apexParametersMap::containsKey); - apexParametersMap.clear(); - aggregatedEngineParameters = new EngineParameters(); - AxContextAlbums albums = ModelService.getModel(AxContextAlbums.class); - Map<AxArtifactKey, AxContextAlbum> albumsMap = new TreeMap<>(); - for (Entry<ToscaConceptIdentifier, String[]> policyArgsEntry : policyArgsMap.entrySet()) { - findAlbumsToHold(albumsMap, policyArgsEntry.getKey()); - try { - apexParametersMap.put(policyArgsEntry.getKey(), populateApexParameters(policyArgsEntry.getValue())); - } catch (ApexException e) { - LOGGER.error("Invalid arguments specified for policy - {}:{}", policyArgsEntry.getKey().getName(), - policyArgsEntry.getKey().getVersion(), e); - } - } - // Set the aggregated engineParameters - ApexParameters apexParameters = apexParametersMap.values().iterator().next(); - apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters); - ParameterService.clear(); - apexParameterHandler.registerParameters(apexParameters); - try { - if (albumsMap.isEmpty() && !isAnyPolicyDeployed) { - // clear context since none of the policies' context albums has to be retained - // this could be because all policies have a major version change, - // or a full set of new policies are received in the update message - activator.terminate(); - // ParameterService is cleared when activator is terminated. Register the parameters again in this case - apexParameterHandler.registerParameters(apexParameters); - activator = new ApexActivator(apexParametersMap); - activator.initialize(); - setAlive(true); - } else { - albums.setAlbumsMap(albumsMap); - activator.setApexParametersMap(apexParametersMap); - activator.updateModel(apexParametersMap); - } - } catch (final ApexException e) { - LOGGER.error(APEX_SERVICE_FAILED_MSG, e); - activator.terminate(); - throw new ApexException(e.getMessage()); - } - } - - /** - * Find the context albums which should be retained when updating the policies. - * - * @param albumsMap the albums which should be kept during model update - * @param policyId the policy id of current policy - */ - private void findAlbumsToHold(Map<AxArtifactKey, AxContextAlbum> albumsMap, ToscaConceptIdentifier policyId) { - for (Entry<ToscaConceptIdentifier, AxPolicyModel> policyModelsEntry : activator.getPolicyModelsMap() - .entrySet()) { - // If a policy with the same major version is received in PDP_UPDATE, - // context for that policy has to be retained. For this, take such policies' albums - if (policyModelsEntry.getKey().getName().equals(policyId.getName()) - && policyModelsEntry.getKey().getVersion().split("\\.")[0] - .equals(policyId.getVersion().split("\\.")[0])) { - albumsMap.putAll(policyModelsEntry.getValue().getAlbums().getAlbumsMap()); - } - } - } - private ApexParameters populateApexParameters(String[] args) throws ApexException { // Check the arguments final ApexCommandLineArguments arguments = new ApexCommandLineArguments(); @@ -263,11 +140,32 @@ public class ApexMain { ehParameterEntry.getValue().setName(ehParameterEntry.getKey()); } } - aggregateEngineParameters(axParameters.getEngineServiceParameters().getEngineParameters()); return axParameters; } - private void aggregateEngineParameters(EngineParameters engineParameters) { + private void aggregateParametersAndRegister() throws ApexException { + ApexParameters aggregatedParameters = null; + if (ParameterService.contains(ApexParameterConstants.MAIN_GROUP_NAME)) { + aggregatedParameters = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME); + } else { + aggregatedParameters = new ApexParameters(); + aggregatedParameters.setEngineServiceParameters(new EngineServiceParameters()); + apexParameterHandler.registerParameters(aggregatedParameters); + } + List<String> duplicateInputParameters = aggregatedParameters.getEventInputParameters().keySet().stream() + .filter(apexParameters.getEventInputParameters()::containsKey).collect(Collectors.toList()); + List<String> duplicateOutputParameters = aggregatedParameters.getEventOutputParameters().keySet().stream() + .filter(apexParameters.getEventOutputParameters()::containsKey).collect(Collectors.toList()); + if (!(duplicateInputParameters.isEmpty() && duplicateOutputParameters.isEmpty())) { + throw new ApexException( + "start of Apex service failed because this policy has the following duplicate I/O parameters: " + + duplicateInputParameters + "/" + duplicateOutputParameters); + } + aggregatedParameters.getEventInputParameters().putAll(apexParameters.getEventInputParameters()); + aggregatedParameters.getEventOutputParameters().putAll(apexParameters.getEventOutputParameters()); + EngineParameters aggregatedEngineParameters = + aggregatedParameters.getEngineServiceParameters().getEngineParameters(); + EngineParameters engineParameters = apexParameters.getEngineServiceParameters().getEngineParameters(); aggregatedEngineParameters.getTaskParameters().addAll(engineParameters.getTaskParameters()); aggregatedEngineParameters.getExecutorParameterMap().putAll(engineParameters.getExecutorParameterMap()); aggregatedEngineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap() @@ -346,8 +244,9 @@ public class ApexMain { * The main method. * * @param args the arguments + * @throws ApexException the apex exception. */ - public static void main(final String[] args) { + public static void main(final String[] args) throws ApexException { new ApexMain(args); } } diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java index 9a07b4ad5..673d9cab5 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019 Nordix Foundation. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,8 +73,6 @@ public class ApexParameterHandler { */ public ApexParameters getParameters(final ApexCommandLineArguments arguments) throws ParameterException { - ParameterService.clear(); - ApexParameters parameters = null; String toscaPolicyFilePath = arguments.getToscaPolicyFilePath(); // Read the parameters diff --git a/services/services-engine/src/test/java/org/onap/policy/apex/service/engine/main/ApexMainTest.java b/services/services-engine/src/test/java/org/onap/policy/apex/service/engine/main/ApexMainTest.java index 68472d4de..2cb12c397 100644 --- a/services/services-engine/src/test/java/org/onap/policy/apex/service/engine/main/ApexMainTest.java +++ b/services/services-engine/src/test/java/org/onap/policy/apex/service/engine/main/ApexMainTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2020-2021 Nordix Foundation. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,21 +23,22 @@ package org.onap.policy.apex.service.engine.main; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; import java.io.OutputStream; import java.io.PrintStream; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Test; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; -import org.onap.policy.apex.service.parameters.ApexParameters; -import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.onap.policy.apex.model.basicmodel.service.ModelService; +import org.onap.policy.common.parameters.ParameterService; /** * Test the ApexMain class. @@ -114,13 +115,13 @@ public class ApexMainTest { OutputStream outContent = new ByteArrayOutputStream(); System.setOut(new PrintStream(outContent)); - String[] args = { "-p", "src/test/resources/parameters/correctParams.json" }; + String[] args = {"-p", "src/test/resources/parameters/correctParams.json"}; final ApexMain apexMain = new ApexMain(args); - assertEquals("MyApexEngine", - apexMain.getApexParametersMap().values().iterator().next().getEngineServiceParameters().getName()); - await().atMost(200, TimeUnit.MILLISECONDS).until(() -> outContent.toString() - .contains("Added the action listener to the engine")); + assertEquals("MyApexEngine", apexMain.getApexParameters().getEngineServiceParameters().getName()); + await().atMost(200, TimeUnit.MILLISECONDS) + .until(() -> outContent.toString().contains("Added the action listener to the engine")); + assertTrue(apexMain.isAlive()); apexMain.shutdown(); } @@ -129,16 +130,15 @@ public class ApexMainTest { OutputStream outContent = new ByteArrayOutputStream(); System.setOut(new PrintStream(outContent)); - String[] args = { "-p", "src/test/resources/parameters/correctParamsJavaProperties.json" }; + String[] args = {"-p", "src/test/resources/parameters/correctParamsJavaProperties.json"}; final ApexMain apexMain = new ApexMain(args); - assertEquals("MyApexEngine", - apexMain.getApexParametersMap().values().iterator().next().getEngineServiceParameters().getName()); + assertEquals("MyApexEngine", apexMain.getApexParameters().getEngineServiceParameters().getName()); assertEquals("trust-store-file", System.getProperty("javax.net.ssl.trustStore")); assertEquals("Pol1cy_0nap", System.getProperty("javax.net.ssl.trustStorePassword")); - await().atMost(10000, TimeUnit.MILLISECONDS).until(() -> outContent.toString() - .contains("Added the action listener to the engine")); + await().atMost(10000, TimeUnit.MILLISECONDS) + .until(() -> outContent.toString().contains("Added the action listener to the engine")); apexMain.shutdown(); } @@ -146,46 +146,41 @@ public class ApexMainTest { public void testCorrectParametersWithMultiplePolicies() throws ApexException { OutputStream outContent = new ByteArrayOutputStream(); System.setOut(new PrintStream(outContent)); - Map<ToscaConceptIdentifier, String[]> argsMap = new HashMap<ToscaConceptIdentifier, String[]>(); - String[] args = {"-p", "src/test/resources/parameters/correctParams.json"}; - argsMap.put(new ToscaConceptIdentifier("id1", "v1"), args); - final ApexMain apexMain = new ApexMain(argsMap); - ApexParameters apexParam = (ApexParameters) apexMain.getApexParametersMap().values().toArray()[0]; - assertEquals("MyApexEngine", apexParam.getEngineServiceParameters().getName()); - apexMain.shutdown(); + String[] args1 = {"-p", "src/test/resources/parameters/correctParams.json"}; + String[] args2 = {"-p", "src/test/resources/parameters/correctParams2.json"}; + final ApexMain apexMain1 = new ApexMain(args1); + final ApexMain apexMain2 = new ApexMain(args2); + assertEquals("MyApexEngine", apexMain1.getApexParameters().getEngineServiceParameters().getName()); + assertEquals("MyApexEngine2", apexMain2.getApexParameters().getEngineServiceParameters().getName()); final String outString = outContent.toString(); - assertThat(outString).contains("Added the action listener to the engine"); + assertThat(outString).contains("Added the action listener to the engine") + .contains("Created apex engine MyApexEngine").contains("Created apex engine MyApexEngine2"); + assertTrue(apexMain1.isAlive()); + assertTrue(apexMain2.isAlive()); + apexMain1.shutdown(); + apexMain2.shutdown(); + ModelService.clear(); + ParameterService.clear(); } @Test public void testInCorrectParametersWithMultiplePolicies() throws ApexException { - OutputStream outContent = new ByteArrayOutputStream(); - System.setOut(new PrintStream(outContent)); - Map<ToscaConceptIdentifier, String[]> argsMap = new HashMap<ToscaConceptIdentifier, String[]>(); String[] args = {"-p", "src/test/resources/parameters/correctParams.json"}; - argsMap.put(new ToscaConceptIdentifier("id1", "v1"), args); - argsMap.put(new ToscaConceptIdentifier("id2", "v2"), args); - final ApexMain apexMain = new ApexMain(argsMap); - ApexParameters apexParam = (ApexParameters) apexMain.getApexParametersMap().values().toArray()[0]; - assertEquals("MyApexEngine", apexParam.getEngineServiceParameters().getName()); - apexMain.shutdown(); - final String outString = outContent.toString(); - assertThat(outString).contains("I/O Parameters [TheFileConsumer1]/[FirstProducer] for id2:v2 are duplicates. " - + "So this policy is not executed"); - assertEquals(1, apexMain.getApexParametersMap().size()); // only id1:v1 is kept in the map, id2:v2 failed + final ApexMain apexMain1 = new ApexMain(args); + assertThatThrownBy(() -> new ApexMain(args)).hasMessage("start of Apex service failed because this" + + " policy has the following duplicate I/O parameters: [TheFileConsumer1]/[FirstProducer]"); + apexMain1.shutdown(); } @Test public void testInvalidArgsWithMultiplePolicies() throws ApexException { OutputStream outContent = new ByteArrayOutputStream(); System.setOut(new PrintStream(outContent)); - Map<ToscaConceptIdentifier, String[]> argsMap = new HashMap<ToscaConceptIdentifier, String[]>(); String[] args = {"-c", "file1", "-m", "file2"}; - argsMap.put(new ToscaConceptIdentifier("id1", "v1"), args); - final ApexMain apexMain = new ApexMain(argsMap); + final ApexMain apexMain = new ApexMain(args); final String outString = outContent.toString(); apexMain.shutdown(); assertThat(outString).contains("Arguments validation failed", "start of Apex service failed"); - assertThat(apexMain.getApexParametersMap()).isEmpty(); // No policy is running in the engine + assertFalse(apexMain.isAlive()); // No policy is running in the engine } } diff --git a/services/services-engine/src/test/resources/parameters/correctParams.json b/services/services-engine/src/test/resources/parameters/correctParams.json index 43fef1cbe..7ea06d715 100644 --- a/services/services-engine/src/test/resources/parameters/correctParams.json +++ b/services/services-engine/src/test/resources/parameters/correctParams.json @@ -501,7 +501,8 @@ }, "eventProtocolParameters": { "eventProtocol": "JSON" - } + }, + "eventName": "BasicEvent" } } } diff --git a/services/services-engine/src/test/resources/parameters/correctParams2.json b/services/services-engine/src/test/resources/parameters/correctParams2.json new file mode 100644 index 000000000..2d2ebff23 --- /dev/null +++ b/services/services-engine/src/test/resources/parameters/correctParams2.json @@ -0,0 +1,513 @@ +{ + "tosca_definitions_version": "tosca_simple_yaml_1_1_0", + "topology_template": { + "policies": [ + { + "onap.policies.apex.Simplecontrolloop": { + "type": "onap.policies.native.Apex", + "type_version": "1.0.0", + "name": "onap.policies.apex.Simplecontrolloop", + "version": "1.0.0", + "properties": { + "engineServiceParameters": { + "name": "MyApexEngine2", + "version": "0.0.1", + "id": 45, + "instanceCount": 2, + "deploymentPort": 65522, + "policy_type_impl": { + "apexPolicyModel": { + "key": { + "name": "SmallModel", + "version": "0.0.1" + }, + "keyInformation": { + "key": { + "name": "SmallModel_KeyInfo", + "version": "0.0.1" + }, + "keyInfoMap": { + "entry": [ + { + "key": { + "name": "BasicContextAlbum", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "BasicContextAlbum", + "version": "0.0.1" + }, + "UUID": "fec1b353-b35f-4384-b7d9-69622059c248", + "description": "Generated description for a concept called \"BasicContextAlbum\" with version \"0.0.1\" and UUID \"fec1b353-b35f-4384-b7d9-69622059c248\"" + } + }, + { + "key": { + "name": "BasicEvent", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "BasicEvent", + "version": "0.0.1" + }, + "UUID": "cc8d3c1a-e975-459a-bcd2-69f423eaa1f3", + "description": "Generated description for a concept called \"BasicEvent\" with version \"0.0.1\" and UUID \"cc8d3c1a-e975-459a-bcd2-69f423eaa1f3\"" + } + }, + { + "key": { + "name": "BasicPolicy", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "BasicPolicy", + "version": "0.0.1" + }, + "UUID": "d0c5d8ee-5fe7-4978-89ce-4a3e69cad043", + "description": "Generated description for a concept called \"BasicPolicy\" with version \"0.0.1\" and UUID \"d0c5d8ee-5fe7-4978-89ce-4a3e69cad043\"" + } + }, + { + "key": { + "name": "BasicTask", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "BasicTask", + "version": "0.0.1" + }, + "UUID": "c5651414-fc1c-493b-878d-75f0ce685c36", + "description": "Generated description for a concept called \"BasicTask\" with version \"0.0.1\" and UUID \"c5651414-fc1c-493b-878d-75f0ce685c36\"" + } + }, + { + "key": { + "name": "IntType", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "IntType", + "version": "0.0.1" + }, + "UUID": "790ff718-8dc0-44e0-89d8-1b3bbe238310", + "description": "Generated description for a concept called \"IntType\" with version \"0.0.1\" and UUID \"790ff718-8dc0-44e0-89d8-1b3bbe238310\"" + } + }, + { + "key": { + "name": "SmallModel", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "SmallModel", + "version": "0.0.1" + }, + "UUID": "a1bd1f4e-713b-456b-b1a8-bb48beee28e8", + "description": "Generated description for a concept called \"SmallModel\" with version \"0.0.1\" and UUID \"a1bd1f4e-713b-456b-b1a8-bb48beee28e8\"" + } + }, + { + "key": { + "name": "SmallModel_Albums", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "SmallModel_Albums", + "version": "0.0.1" + }, + "UUID": "72bed9af-ab7d-3379-b9f7-b5eca5c9ef22", + "description": "Generated description for concept referred to by key \"SmallModel_Albums:0.0.1\"" + } + }, + { + "key": { + "name": "SmallModel_Events", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "SmallModel_Events", + "version": "0.0.1" + }, + "UUID": "796dc6b0-627d-34ae-a5e2-1bc4b4b486b8", + "description": "Generated description for concept referred to by key \"SmallModel_Events:0.0.1\"" + } + }, + { + "key": { + "name": "SmallModel_KeyInfo", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "SmallModel_KeyInfo", + "version": "0.0.1" + }, + "UUID": "b4876774-6907-3d27-a2b8-f05737c5ee4a", + "description": "Generated description for concept referred to by key \"SmallModel_KeyInfo:0.0.1\"" + } + }, + { + "key": { + "name": "SmallModel_Policies", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "SmallModel_Policies", + "version": "0.0.1" + }, + "UUID": "5bcf946b-67be-3190-a906-f954896f999f", + "description": "Generated description for concept referred to by key \"SmallModel_Policies:0.0.1\"" + } + }, + { + "key": { + "name": "SmallModel_Schemas", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "SmallModel_Schemas", + "version": "0.0.1" + }, + "UUID": "c25bf5c3-7f1e-3667-b8a9-971ba21517bc", + "description": "Generated description for concept referred to by key \"SmallModel_Schemas:0.0.1\"" + } + }, + { + "key": { + "name": "SmallModel_Tasks", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "SmallModel_Tasks", + "version": "0.0.1" + }, + "UUID": "43b015ca-2ed1-3a35-b103-e8a5aa68f1ef", + "description": "Generated description for concept referred to by key \"SmallModel_Tasks:0.0.1\"" + } + } + ] + } + }, + "policies": { + "key": { + "name": "SmallModel_Policies", + "version": "0.0.1" + }, + "policyMap": { + "entry": [ + { + "key": { + "name": "BasicPolicy", + "version": "0.0.1" + }, + "value": { + "policyKey": { + "name": "BasicPolicy", + "version": "0.0.1" + }, + "template": "FREEFORM", + "state": { + "entry": [ + { + "key": "OnlyState", + "value": { + "stateKey": { + "parentKeyName": "BasicPolicy", + "parentKeyVersion": "0.0.1", + "parentLocalName": "NULL", + "localName": "OnlyState" + }, + "trigger": { + "name": "BasicEvent", + "version": "0.0.1" + }, + "stateOutputs": { + "entry": [ + { + "key": "OnlyOutput", + "value": { + "key": { + "parentKeyName": "BasicPolicy", + "parentKeyVersion": "0.0.1", + "parentLocalName": "OnlyState", + "localName": "OnlyOutput" + }, + "outgoingEvent": { + "name": "BasicEvent", + "version": "0.0.1" + }, + "nextState": { + "parentKeyName": "NULL", + "parentKeyVersion": "0.0.0", + "parentLocalName": "NULL", + "localName": "NULL" + } + } + } + ] + }, + "contextAlbumReference": [ + { + "name": "BasicContextAlbum", + "version": "0.0.1" + } + ], + "taskSelectionLogic": { + "key": "NULL", + "logicFlavour": "UNDEFINED", + "logic": "" + }, + "stateFinalizerLogicMap": { + "entry": [] + }, + "defaultTask": { + "name": "BasicTask", + "version": "0.0.1" + }, + "taskReferences": { + "entry": [ + { + "key": { + "name": "BasicTask", + "version": "0.0.1" + }, + "value": { + "key": { + "parentKeyName": "BasicPolicy", + "parentKeyVersion": "0.0.1", + "parentLocalName": "OnlyState", + "localName": "BasicTask" + }, + "outputType": "DIRECT", + "output": { + "parentKeyName": "BasicPolicy", + "parentKeyVersion": "0.0.1", + "parentLocalName": "OnlyState", + "localName": "OnlyOutput" + } + } + } + ] + } + } + } + ] + }, + "firstState": "OnlyState" + } + } + ] + } + }, + "tasks": { + "key": { + "name": "SmallModel_Tasks", + "version": "0.0.1" + }, + "taskMap": { + "entry": [ + { + "key": { + "name": "BasicTask", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "BasicTask", + "version": "0.0.1" + }, + "inputFields": { + "entry": [ + { + "key": "intPar", + "value": { + "key": "intPar", + "fieldSchemaKey": { + "name": "IntType", + "version": "0.0.1" + }, + "optional": false + } + } + ] + }, + "outputFields": { + "entry": [ + { + "key": "intPar", + "value": { + "key": "intPar", + "fieldSchemaKey": { + "name": "IntType", + "version": "0.0.1" + }, + "optional": false + } + } + ] + }, + "taskParameters": { + "entry": [] + }, + "contextAlbumReference": [ + { + "name": "BasicContextAlbum", + "version": "0.0.1" + } + ], + "taskLogic": { + "key": "TaskLogic", + "logicFlavour": "JAVASCRIPT", + "logic": "executor.logger.debug(executor.subject.id);\nvar gc = executor.getContextAlbum(\"BasicContextAlbum\");\nexecutor.logger.debug(gc.name);\nexecutor.logger.debug(executor.inFields);\n\nexecutor.logger.debug(executor.eo);\n\nvar returnValue = executor.isTrue;" + } + } + } + ] + } + }, + "events": { + "key": { + "name": "SmallModel_Events", + "version": "0.0.1" + }, + "eventMap": { + "entry": [ + { + "key": { + "name": "BasicEvent", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "BasicEvent", + "version": "0.0.1" + }, + "nameSpace": "org.onap.policy.apex.events", + "source": "source", + "target": "target", + "parameter": { + "entry": [ + { + "key": "intPar", + "value": { + "key": "intPar", + "fieldSchemaKey": { + "name": "IntType", + "version": "0.0.1" + }, + "optional": false + } + } + ] + } + } + } + ] + } + }, + "albums": { + "key": { + "name": "SmallModel_Albums", + "version": "0.0.1" + }, + "albums": { + "entry": [ + { + "key": { + "name": "BasicContextAlbum", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "BasicContextAlbum", + "version": "0.0.1" + }, + "scope": "GLOBAL", + "isWritable": true, + "itemSchema": { + "name": "IntType", + "version": "0.0.1" + } + } + } + ] + } + }, + "schemas": { + "key": { + "name": "SmallModel_Schemas", + "version": "0.0.1" + }, + "schemas": { + "entry": [ + { + "key": { + "name": "IntType", + "version": "0.0.1" + }, + "value": { + "key": { + "name": "IntType", + "version": "0.0.1" + }, + "schemaFlavour": "Java", + "schemaDefinition": "java.lang.Integer" + } + } + ] + } + } + } + }, + "engineParameters": { + "executorParameters": { + "JAVASCRIPT": { + "parameterClassName": "org.onap.policy.apex.service.engine.parameters.dummyclasses.SuperDooperExecutorParameters" + } + } + } + }, + "eventOutputParameters": { + "FirstProducer2": { + "carrierTechnologyParameters": { + "carrierTechnology": "FILE", + "parameters": { + "standardIo": true + } + }, + "eventProtocolParameters": { + "eventProtocol": "JSON" + } + } + }, + "eventInputParameters": { + "TheFileConsumer2": { + "carrierTechnologyParameters": { + "carrierTechnology": "FILE", + "parameters": { + "fileName": "src/test/resources/events/TestPojoEvent.json" + } + }, + "eventProtocolParameters": { + "eventProtocol": "JSON" + }, + "eventName": "BasicEvent" + } + } + } + } + } + ] + } +}
\ No newline at end of file diff --git a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java index 4d560a3bb..03e97215b 100644 --- a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java +++ b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019-2021 Nordix Foundation. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,13 +24,36 @@ package org.onap.policy.apex.services.onappf.handler; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.onap.policy.apex.core.engine.EngineParameters; +import org.onap.policy.apex.core.engine.TaskParameters; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInfo; +import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInformation; +import org.onap.policy.apex.model.basicmodel.service.ModelService; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchemas; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvents; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicies; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicy; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; +import org.onap.policy.apex.model.policymodel.concepts.AxTask; +import org.onap.policy.apex.model.policymodel.concepts.AxTasks; import org.onap.policy.apex.service.engine.main.ApexMain; +import org.onap.policy.apex.service.parameters.ApexParameterConstants; +import org.onap.policy.apex.service.parameters.ApexParameters; import org.onap.policy.apex.services.onappf.exception.ApexStarterException; +import org.onap.policy.common.parameters.ParameterService; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; @@ -49,7 +72,7 @@ public class ApexEngineHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ApexEngineHandler.class); - private ApexMain apexMain; + private Map<ToscaConceptIdentifier, ApexMain> apexMainMap; /** * Constructs the object. Extracts the config and model files from each policy and instantiates the apex engine. @@ -57,13 +80,13 @@ public class ApexEngineHandler { * @param policies the list of policies * @throws ApexStarterException if the apex engine instantiation failed using the policies passed */ - public ApexEngineHandler(List<ToscaPolicy> policies) throws ApexStarterException { - Map<ToscaConceptIdentifier, String[]> policyArgsMap = createPolicyArgsMap(policies); + public ApexEngineHandler(List<ToscaPolicy> policies) throws ApexStarterException { LOGGER.debug("Starting apex engine."); - try { - apexMain = new ApexMain(policyArgsMap); - } catch (ApexException e) { - throw new ApexStarterException(e); + apexMainMap = initiateApexEngineForPolicies(policies); + if (apexMainMap.isEmpty()) { + ModelService.clear(); + ParameterService.clear(); + throw new ApexStarterException("Apex Engine failed to start."); } } @@ -74,20 +97,136 @@ public class ApexEngineHandler { * @throws ApexStarterException if the apex engine instantiation failed using the policies passed */ public void updateApexEngine(List<ToscaPolicy> policies) throws ApexStarterException { - if (null == apexMain || !apexMain.isAlive()) { - throw new ApexStarterException("Apex Engine not initialized."); + List<ToscaConceptIdentifier> runningPolicies = getRunningPolicies(); + List<ToscaPolicy> policiesToDeploy = policies.stream() + .filter(policy -> !runningPolicies.contains(policy.getIdentifier())).collect(Collectors.toList()); + List<ToscaConceptIdentifier> policiesToUnDeploy = runningPolicies.stream() + .filter(polId -> policies.stream().noneMatch(policy -> policy.getIdentifier().equals(polId))) + .collect(Collectors.toList()); + Map<ToscaConceptIdentifier, ApexMain> undeployedPoliciesMainMap = new LinkedHashMap<>(); + policiesToUnDeploy.forEach(policyId -> { + ApexMain apexMain = apexMainMap.get(policyId); + try { + apexMain.shutdown(); + undeployedPoliciesMainMap.put(policyId, apexMain); + apexMainMap.remove(policyId); + } catch (ApexException e) { + LOGGER.error("Shutting down policy {} failed", policyId, e); + } + }); + if (!undeployedPoliciesMainMap.isEmpty()) { + updateModelAndParameterServices(undeployedPoliciesMainMap); } - Map<ToscaConceptIdentifier, String[]> policyArgsMap = createPolicyArgsMap(policies); - try { - apexMain.updateModel(policyArgsMap); - } catch (ApexException e) { - throw new ApexStarterException(e); + if (!policiesToDeploy.isEmpty()) { + Map<ToscaConceptIdentifier, ApexMain> mainMap = initiateApexEngineForPolicies(policiesToDeploy); + if (mainMap.isEmpty()) { + throw new ApexStarterException("Updating the APEX engine with new policies failed."); + } + apexMainMap.putAll(mainMap); + } + if (apexMainMap.isEmpty()) { + ModelService.clear(); + ParameterService.clear(); + } + } + + /** + * Clear the corresponding items from ModelService and ParameterService. + * + * @param undeployedPoliciesMainMap the policies that are undeployed + */ + private void updateModelAndParameterServices(Map<ToscaConceptIdentifier, ApexMain> undeployedPoliciesMainMap) { + Set<String> inputParamKeysToRetain = new HashSet<>(); + Set<String> outputParamKeysToRetain = new HashSet<>(); + List<TaskParameters> taskParametersToRetain = new ArrayList<>(); + List<String> executorParamKeysToRetain = new ArrayList<>(); + List<String> schemaParamKeysToRetain = new ArrayList<>(); + + List<AxArtifactKey> keyInfoKeystoRetain = new ArrayList<>(); + List<AxArtifactKey> schemaKeystoRetain = new ArrayList<>(); + List<AxArtifactKey> eventKeystoRetain = new ArrayList<>(); + List<AxArtifactKey> albumKeystoRetain = new ArrayList<>(); + List<AxArtifactKey> taskKeystoRetain = new ArrayList<>(); + List<AxArtifactKey> policyKeystoRetain = new ArrayList<>(); + + apexMainMap.values().forEach(main -> { + inputParamKeysToRetain.addAll(main.getApexParameters().getEventInputParameters().keySet()); + outputParamKeysToRetain.addAll(main.getApexParameters().getEventOutputParameters().keySet()); + taskParametersToRetain.addAll( + main.getApexParameters().getEngineServiceParameters().getEngineParameters().getTaskParameters()); + executorParamKeysToRetain.addAll(main.getApexParameters().getEngineServiceParameters().getEngineParameters() + .getExecutorParameterMap().keySet()); + schemaParamKeysToRetain.addAll(main.getApexParameters().getEngineServiceParameters().getEngineParameters() + .getContextParameters().getSchemaParameters().getSchemaHelperParameterMap().keySet()); + + keyInfoKeystoRetain + .addAll(main.getActivator().getPolicyModel().getKeyInformation().getKeyInfoMap().keySet()); + schemaKeystoRetain.addAll(main.getActivator().getPolicyModel().getSchemas().getSchemasMap().keySet()); + eventKeystoRetain.addAll(main.getActivator().getPolicyModel().getEvents().getEventMap().keySet()); + albumKeystoRetain.addAll(main.getActivator().getPolicyModel().getAlbums().getAlbumsMap().keySet()); + taskKeystoRetain.addAll(main.getActivator().getPolicyModel().getTasks().getTaskMap().keySet()); + policyKeystoRetain.addAll(main.getActivator().getPolicyModel().getPolicies().getPolicyMap().keySet()); + }); + for (ApexMain main : undeployedPoliciesMainMap.values()) { + ApexParameters existingParameters = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME); + List<String> eventInputParamKeysToRemove = main.getApexParameters().getEventInputParameters().keySet() + .stream().filter(key -> !inputParamKeysToRetain.contains(key)).collect(Collectors.toList()); + List<String> eventOutputParamKeysToRemove = main.getApexParameters().getEventOutputParameters().keySet() + .stream().filter(key -> !outputParamKeysToRetain.contains(key)).collect(Collectors.toList()); + eventInputParamKeysToRemove.forEach(existingParameters.getEventInputParameters()::remove); + eventOutputParamKeysToRemove.forEach(existingParameters.getEventOutputParameters()::remove); + EngineParameters engineParameters = + main.getApexParameters().getEngineServiceParameters().getEngineParameters(); + final List<TaskParameters> taskParametersToRemove = engineParameters.getTaskParameters().stream() + .filter(taskParameter -> !taskParametersToRetain.contains(taskParameter)).collect(Collectors.toList()); + final List<String> executorParamKeysToRemove = engineParameters.getExecutorParameterMap().keySet().stream() + .filter(key -> !executorParamKeysToRetain.contains(key)).collect(Collectors.toList()); + final List<String> schemaParamKeysToRemove = + engineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap().keySet() + .stream().filter(key -> !schemaParamKeysToRetain.contains(key)).collect(Collectors.toList()); + EngineParameters aggregatedEngineParameters = + existingParameters.getEngineServiceParameters().getEngineParameters(); + aggregatedEngineParameters.getTaskParameters().removeAll(taskParametersToRemove); + executorParamKeysToRemove.forEach(aggregatedEngineParameters.getExecutorParameterMap()::remove); + schemaParamKeysToRemove.forEach(aggregatedEngineParameters.getContextParameters().getSchemaParameters() + .getSchemaHelperParameterMap()::remove); + + final AxPolicyModel policyModel = main.getActivator().getPolicyModel(); + final List<AxArtifactKey> keyInfoKeystoRemove = policyModel.getKeyInformation().getKeyInfoMap().keySet() + .stream().filter(key -> !keyInfoKeystoRetain.contains(key)).collect(Collectors.toList()); + final List<AxArtifactKey> schemaKeystoRemove = policyModel.getSchemas().getSchemasMap().keySet().stream() + .filter(key -> !schemaKeystoRetain.contains(key)).collect(Collectors.toList()); + final List<AxArtifactKey> eventKeystoRemove = policyModel.getEvents().getEventMap().keySet().stream() + .filter(key -> !eventKeystoRetain.contains(key)).collect(Collectors.toList()); + final List<AxArtifactKey> albumKeystoRemove = policyModel.getAlbums().getAlbumsMap().keySet().stream() + .filter(key -> !albumKeystoRetain.contains(key)).collect(Collectors.toList()); + final List<AxArtifactKey> taskKeystoRemove = policyModel.getTasks().getTaskMap().keySet().stream() + .filter(key -> !taskKeystoRetain.contains(key)).collect(Collectors.toList()); + final List<AxArtifactKey> policyKeystoRemove = policyModel.getPolicies().getPolicyMap().keySet().stream() + .filter(key -> !policyKeystoRetain.contains(key)).collect(Collectors.toList()); + + final Map<AxArtifactKey, AxKeyInfo> keyInfoMap = + ModelService.getModel(AxKeyInformation.class).getKeyInfoMap(); + final Map<AxArtifactKey, AxContextSchema> schemasMap = + ModelService.getModel(AxContextSchemas.class).getSchemasMap(); + final Map<AxArtifactKey, AxEvent> eventMap = ModelService.getModel(AxEvents.class).getEventMap(); + final Map<AxArtifactKey, AxContextAlbum> albumsMap = + ModelService.getModel(AxContextAlbums.class).getAlbumsMap(); + final Map<AxArtifactKey, AxTask> taskMap = ModelService.getModel(AxTasks.class).getTaskMap(); + final Map<AxArtifactKey, AxPolicy> policyMap = ModelService.getModel(AxPolicies.class).getPolicyMap(); + + keyInfoKeystoRemove.forEach(keyInfoMap::remove); + schemaKeystoRemove.forEach(schemasMap::remove); + eventKeystoRemove.forEach(eventMap::remove); + albumKeystoRemove.forEach(albumsMap::remove); + taskKeystoRemove.forEach(taskMap::remove); + policyKeystoRemove.forEach(policyMap::remove); } } - private Map<ToscaConceptIdentifier, String[]> createPolicyArgsMap(List<ToscaPolicy> policies) + private Map<ToscaConceptIdentifier, ApexMain> initiateApexEngineForPolicies(List<ToscaPolicy> policies) throws ApexStarterException { - Map<ToscaConceptIdentifier, String[]> policyArgsMap = new LinkedHashMap<>(); + Map<ToscaConceptIdentifier, ApexMain> mainMap = new LinkedHashMap<>(); for (ToscaPolicy policy : policies) { String policyName = policy.getIdentifier().getName(); final StandardCoder standardCoder = new StandardCoder(); @@ -103,34 +242,38 @@ public class ApexEngineHandler { throw new ApexStarterException(e); } final String[] apexArgs = {"-p", file.getAbsolutePath()}; - policyArgsMap.put(policy.getIdentifier(), apexArgs); + LOGGER.info("Starting apex engine for policy {}", policy.getIdentifier()); + try { + ApexMain apexMain = new ApexMain(apexArgs); + mainMap.put(policy.getIdentifier(), apexMain); + } catch (Exception e) { + LOGGER.error("Execution of policy {} failed", policy.getIdentifier(), e); + } } - return policyArgsMap; + return mainMap; } /** * Method to get the APEX engine statistics. */ public List<AxEngineModel> getEngineStats() { - List<AxEngineModel> engineStats = null; - if (null != apexMain && apexMain.isAlive()) { - engineStats = apexMain.getEngineStats(); - } - return engineStats; + // engineStats from all the apexMain instances running individual tosca policies are combined here. + return apexMainMap.values().stream().filter(apexMain -> (null != apexMain && apexMain.isAlive())) + .flatMap(m -> m.getEngineStats().stream()).collect(Collectors.toList()); } /** * Method to check whether the apex engine is running or not. */ public boolean isApexEngineRunning() { - return null != apexMain && apexMain.isAlive(); + return apexMainMap.values().stream().anyMatch(apexMain -> (null != apexMain && apexMain.isAlive())); } /** * Method that return the list of running policies in the apex engine. */ public List<ToscaConceptIdentifier> getRunningPolicies() { - return new ArrayList<>(apexMain.getApexParametersMap().keySet()); + return new ArrayList<>(apexMainMap.keySet()); } /** @@ -139,8 +282,12 @@ public class ApexEngineHandler { public void shutdown() throws ApexStarterException { try { LOGGER.debug("Shutting down apex engine."); - apexMain.shutdown(); - apexMain = null; + for (ApexMain apexMain : apexMainMap.values()) { + apexMain.shutdown(); + } + apexMainMap.clear(); + ModelService.clear(); + ParameterService.clear(); } catch (final ApexException e) { throw new ApexStarterException(e); } |