From 658e67bc821a3bc55f2c6d877e7e0baa21427333 Mon Sep 17 00:00:00 2001 From: "a.sreekumar" Date: Mon, 25 Jan 2021 11:03:42 +0000 Subject: Improve handling of multiple policy in APEX PDP Change-Id: Ic4adf5bd8876dc31fc93993298e90389baaa2c39 Issue-ID: POLICY-2883 Signed-off-by: a.sreekumar --- .../apex/service/engine/main/ApexActivator.java | 278 ++++++++++----------- .../service/engine/main/ApexEventUnmarshaller.java | 13 +- .../policy/apex/service/engine/main/ApexMain.java | 175 +++---------- .../service/parameters/ApexParameterHandler.java | 4 +- 4 files changed, 182 insertions(+), 288 deletions(-) (limited to 'services/services-engine/src/main') diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java index cadcc7ee8..3451c120c 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019-2021 Nordix Foundation. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,35 +22,41 @@ package org.onap.policy.apex.service.engine.main; -import java.util.AbstractMap; -import java.util.ArrayList; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; import lombok.Getter; import lombok.Setter; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInfo; +import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInformation; import org.onap.policy.apex.model.basicmodel.handling.ApexModelException; import org.onap.policy.apex.model.basicmodel.service.ModelService; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchemas; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvents; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicies; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicy; import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; +import org.onap.policy.apex.model.policymodel.concepts.AxTask; +import org.onap.policy.apex.model.policymodel.concepts.AxTasks; import org.onap.policy.apex.model.policymodel.handling.PolicyModelMerger; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.runtime.EngineService; import org.onap.policy.apex.service.engine.runtime.impl.EngineServiceImpl; +import org.onap.policy.apex.service.parameters.ApexParameterConstants; import org.onap.policy.apex.service.parameters.ApexParameters; import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; import org.onap.policy.common.parameters.ParameterService; -import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; @@ -66,13 +72,12 @@ public class ApexActivator { // The logger for this class private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexActivator.class); - // The parameters of the Apex activator when running with multiple policies @Getter @Setter - private Map apexParametersMap; + private ApexParameters apexParameters; @Getter - Map policyModelsMap; + private AxPolicyModel policyModel; // Event unmarshalers are used to receive events asynchronously into Apex private final Map unmarshallerMap = new LinkedHashMap<>(); @@ -93,10 +98,10 @@ public class ApexActivator { /** * Instantiate the activator for the Apex engine as a complete service. * - * @param parametersMap the apex parameters map for the Apex service + * @param parameters the apex parameters for the Apex service */ - public ApexActivator(Map parametersMap) { - apexParametersMap = parametersMap; + public ApexActivator(ApexParameters parameters) { + apexParameters = parameters; } /** @@ -108,96 +113,131 @@ public class ApexActivator { LOGGER.debug("Apex engine starting as a service . . ."); try { - ApexParameters apexParameters = apexParametersMap.values().iterator().next(); - // totalInstanceCount is the sum of instance counts required as per each policy - int totalInstanceCount = apexParametersMap.values().stream() - .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum(); - apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount); - engineKey = apexParameters.getEngineServiceParameters().getEngineKey(); instantiateEngine(apexParameters); setUpModelMarshallerAndUnmarshaller(apexParameters); } catch (final Exception e) { - LOGGER.debug(APEX_ENGINE_FAILED_MSG, e); + try { + terminate(); + } catch (ApexException e1) { + LOGGER.error("Terminating the ApexActivator encountered error", e1); + } throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e); } LOGGER.debug("Apex engine started as a service"); } - private void setUpModelMarshallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException { - policyModelsMap = new LinkedHashMap<>(); - Map inputParametersMap = new LinkedHashMap<>(); - Map outputParametersMap = new LinkedHashMap<>(); - Set> apexParamsEntrySet = new LinkedHashSet<>( - apexParametersMap.entrySet()); - apexParamsEntrySet.stream().forEach(apexParamsEntry -> { - ApexParameters apexParams = apexParamsEntry.getValue(); - List duplicateInputParameters = new ArrayList<>(apexParams.getEventInputParameters().keySet()); - duplicateInputParameters.retainAll(inputParametersMap.keySet()); - List duplicateOutputParameters = new ArrayList<>(apexParams.getEventOutputParameters().keySet()); - duplicateOutputParameters.retainAll(outputParametersMap.keySet()); - - if (!(duplicateInputParameters.isEmpty() && duplicateOutputParameters.isEmpty())) { - LOGGER.error("I/O Parameters {}/{} for {}:{} are duplicates. So this policy is not executed.", - duplicateInputParameters, duplicateOutputParameters, apexParamsEntry.getKey().getName(), - apexParamsEntry.getKey().getVersion()); - apexParametersMap.remove(apexParamsEntry.getKey()); - return; - } - inputParametersMap.putAll(apexParams.getEventInputParameters()); - outputParametersMap.putAll(apexParams.getEventOutputParameters()); - // Check if a policy model file has been specified - if (apexParams.getEngineServiceParameters().getPolicyModel() != null) { - LOGGER.debug("deploying policy model to the apex engines . . ."); - try { - final String policyModelString = apexParams.getEngineServiceParameters().getPolicyModel(); - AxPolicyModel policyModel = EngineServiceImpl.createModel(engineKey, policyModelString); - policyModelsMap.put(apexParamsEntry.getKey(), policyModel); - } catch (ApexException e) { - throw new ApexRuntimeException("Failed to create the apex model.", e); - } - } - }); - AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap); + private void instantiateEngine(ApexParameters apexParameters) throws ApexException { + if (null != apexEngineService && apexEngineService.getKey().equals(engineKey)) { + throw new ApexException("Apex Engine already initialized."); + } + // Create engine with specified thread count + LOGGER.debug("starting apex engine service . . ."); + apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters()); + + // Create the engine holder to hold the engine's references and act as an event + // receiver + engineServiceHandler = new ApexEngineServiceHandler(apexEngineService); + } + + private void setUpModelMarshallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException { + AxPolicyModel model; + try { + final String policyModelString = apexParameters.getEngineServiceParameters().getPolicyModel(); + model = EngineServiceImpl.createModel(apexParameters.getEngineServiceParameters().getEngineKey(), + policyModelString); + } catch (ApexException e) { + throw new ApexRuntimeException("Failed to create the apex model.", e); + } + AxKeyInformation existingKeyInformation = null; + AxContextSchemas existingSchemas = null; + AxEvents existingEvents = null; + AxContextAlbums existingAlbums = null; + AxTasks existingTasks = null; + AxPolicies existingPolicies = null; + if (ModelService.existsModel(AxPolicyModel.class)) { + existingKeyInformation = new AxKeyInformation(ModelService.getModel(AxKeyInformation.class)); + existingSchemas = new AxContextSchemas(ModelService.getModel(AxContextSchemas.class)); + existingEvents = new AxEvents(ModelService.getModel(AxEvents.class)); + existingAlbums = new AxContextAlbums(ModelService.getModel(AxContextAlbums.class)); + existingTasks = new AxTasks(ModelService.getModel(AxTasks.class)); + existingPolicies = new AxPolicies(ModelService.getModel(AxPolicies.class)); + } // Set the policy model in the engine - apexEngineService.updateModel(engineKey, finalPolicyModel, true); + try { + apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), model, true); + policyModel = new AxPolicyModel(model); + } catch (ApexException exp) { + // Discard concepts from the current policy + if (null != existingKeyInformation) { + // Update model service with other policies' concepts. + ModelService.registerModel(AxKeyInformation.class, existingKeyInformation); + ModelService.registerModel(AxContextSchemas.class, existingSchemas); + ModelService.registerModel(AxEvents.class, existingEvents); + ModelService.registerModel(AxContextAlbums.class, existingAlbums); + ModelService.registerModel(AxTasks.class, existingTasks); + ModelService.registerModel(AxPolicies.class, existingPolicies); + } else { + ModelService.clear(); + } + throw exp; + } + if (null != existingKeyInformation) { + // Make sure all concepts in previously deployed policies are retained in ModelService + // during multi policy deployment. + updateModelService(existingKeyInformation, existingSchemas, existingEvents, existingAlbums, existingTasks, + existingPolicies); + } - handleExistingMarshallerAndUnmarshaller(inputParametersMap, outputParametersMap); - setUpNewMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap, - outputParametersMap); + setUpNewMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), + apexParameters.getEventInputParameters(), apexParameters.getEventOutputParameters()); // Wire up pairings between marhsallers and unmarshallers - setUpMarshalerPairings(inputParametersMap); + setUpMarshalerPairings(apexParameters.getEventInputParameters()); // Start event processing - startUnmarshallers(inputParametersMap); + startUnmarshallers(apexParameters.getEventInputParameters()); } - private AxPolicyModel aggregatePolicyModels(Map policyModelsMap) { - // Doing a deep copy so that original values in policyModelsMap is retained - // after reduction operation - Set> policyModelsEntries = policyModelsMap.entrySet().stream() - .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue())).collect(Collectors.toSet()); - Optional> finalPolicyModelEntry = policyModelsEntries.stream() - .reduce((entry1, entry2) -> { - try { - entry1.setValue( - PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true)); - } catch (ApexModelException exc) { - LOGGER.error("Policy model for {} : {} has duplicates. So this policy is not executed.", - entry2.getKey().getName(), entry2.getKey().getVersion(), exc); - apexParametersMap.remove(entry2.getKey()); - policyModelsMap.remove(entry2.getKey()); - } - return entry1; - }); - AxPolicyModel finalPolicyModel = null; - if (finalPolicyModelEntry.isPresent()) { - finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.get().getValue()); + private void updateModelService(AxKeyInformation existingKeyInformation, + AxContextSchemas existingSchemas, AxEvents existingEvents, + AxContextAlbums existingAlbums, AxTasks existingTasks, + AxPolicies existingPolicies) throws ApexModelException { + + AxContextSchemas axContextSchemas = ModelService.getModel(AxContextSchemas.class); + AxEvents axEvents = ModelService.getModel(AxEvents.class); + AxContextAlbums axContextAlbums = ModelService.getModel(AxContextAlbums.class); + AxTasks axTasks = ModelService.getModel(AxTasks.class); + AxPolicies axPolicies = ModelService.getModel(AxPolicies.class); + + Map newSchemasMap = axContextSchemas.getSchemasMap(); + Map newEventsMap = axEvents.getEventMap(); + Map newAlbumsMap = axContextAlbums.getAlbumsMap(); + Map newTasksMap = axTasks.getTaskMap(); + Map newPoliciesMap = axPolicies.getPolicyMap(); + + StringBuilder errorMessage = new StringBuilder(); + PolicyModelMerger.checkForDuplicateItem(existingSchemas.getSchemasMap(), newSchemasMap, errorMessage, "schema"); + PolicyModelMerger.checkForDuplicateItem(existingEvents.getEventMap(), newEventsMap, errorMessage, "event"); + PolicyModelMerger.checkForDuplicateItem(existingAlbums.getAlbumsMap(), newAlbumsMap, errorMessage, "album"); + PolicyModelMerger.checkForDuplicateItem(existingTasks.getTaskMap(), newTasksMap, errorMessage, "task"); + PolicyModelMerger.checkForDuplicateItem(existingPolicies.getPolicyMap(), newPoliciesMap, errorMessage, + "policy"); + if (errorMessage.length() > 0) { + throw new ApexModelException(errorMessage.toString()); } - return finalPolicyModel; + + AxKeyInformation axKeyInformation = ModelService.getModel(AxKeyInformation.class); + Map newKeyInfoMap = axKeyInformation.getKeyInfoMap(); + // Now add all the concepts that must be copied over + newKeyInfoMap.putAll(existingKeyInformation.getKeyInfoMap()); + newSchemasMap.putAll(existingSchemas.getSchemasMap()); + newEventsMap.putAll(existingEvents.getEventMap()); + newAlbumsMap.putAll(existingAlbums.getAlbumsMap()); + newTasksMap.putAll(existingTasks.getTaskMap()); + newPoliciesMap.putAll(existingPolicies.getPolicyMap()); } private void setUpNewMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters, @@ -226,38 +266,6 @@ public class ApexActivator { } } - private void handleExistingMarshallerAndUnmarshaller(Map inputParametersMap, - Map outputParametersMap) { - // stop and remove any marshaller/unmarshaller that is part of a policy that is - // undeployed - marshallerMap.entrySet().stream() - .filter(marshallerEntry -> !outputParametersMap.containsKey(marshallerEntry.getKey())) - .forEach(marshallerEntry -> marshallerEntry.getValue().stop()); - marshallerMap.keySet().removeIf(marshallerKey -> !outputParametersMap.containsKey(marshallerKey)); - unmarshallerMap.entrySet().stream() - .filter(unmarshallerEntry -> !inputParametersMap.containsKey(unmarshallerEntry.getKey())) - .forEach(unmarshallerEntry -> unmarshallerEntry.getValue().stop()); - unmarshallerMap.keySet().removeIf(unmarshallerKey -> !inputParametersMap.containsKey(unmarshallerKey)); - - // If a marshaller/unmarshaller is already initialized, they don't need to be - // reinitialized during model update. - outputParametersMap.keySet().removeIf(marshallerMap::containsKey); - inputParametersMap.keySet().removeIf(unmarshallerMap::containsKey); - } - - private void instantiateEngine(ApexParameters apexParameters) throws ApexException { - if (null != apexEngineService && apexEngineService.getKey().equals(engineKey)) { - throw new ApexException("Apex Engine already initialized."); - } - // Create engine with specified thread count - LOGGER.debug("starting apex engine service . . ."); - apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters()); - - // Create the engine holder to hold the engine's references and act as an event - // receiver - engineServiceHandler = new ApexEngineServiceHandler(apexEngineService); - } - /** * Set up unmarshaler/marshaler pairing for synchronized event handling. We only * need to traverse the unmarshalers because the unmarshalers and marshalers are @@ -275,8 +283,8 @@ public class ApexActivator { // Check if the unmarshaler is synchronized with a marshaler if (inputParameters.getValue().isPeeredMode(peeredMode)) { // Find the unmarshaler and marshaler - final ApexEventMarshaller peeredMarshaler = marshallerMap - .get(inputParameters.getValue().getPeer(peeredMode)); + final ApexEventMarshaller peeredMarshaler = + marshallerMap.get(inputParameters.getValue().getPeer(peeredMode)); // Connect the unmarshaler and marshaler unmarshaller.connectMarshaler(peeredMode, peeredMarshaler); @@ -297,22 +305,6 @@ public class ApexActivator { } } - /** - * Updates the APEX Engine with the model created from new Policies. - * - * @param apexParamsMap the apex parameters map for the Apex service - * @throws ApexException on errors - */ - public void updateModel(Map apexParamsMap) throws ApexException { - try { - ApexParameters apexParameters = apexParamsMap.values().iterator().next(); - setUpModelMarshallerAndUnmarshaller(apexParameters); - } catch (final Exception e) { - LOGGER.debug(APEX_ENGINE_FAILED_MSG, e); - throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e); - } - } - /** * Get the Apex engine worker stats. */ @@ -338,19 +330,25 @@ public class ApexActivator { engineServiceHandler.terminate(); engineServiceHandler = null; } - - // Clear the services - ModelService.clear(); - ParameterService.clear(); + // Clear the services in case if this was the only policy running in the engine + ApexParameters apexParams = null; + if (ParameterService.contains(ApexParameterConstants.MAIN_GROUP_NAME)) { + apexParams = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME); + } + if (null != apexParams + && apexParameters.getEventInputParameters().size() == apexParams.getEventInputParameters().size()) { + ModelService.clear(); + ParameterService.clear(); + } } /** * Shuts down all marshallers and unmarshallers. */ private void shutdownMarshallerAndUnmarshaller() { - marshallerMap.values().forEach(ApexEventMarshaller::stop); - marshallerMap.clear(); unmarshallerMap.values().forEach(ApexEventUnmarshaller::stop); unmarshallerMap.clear(); + marshallerMap.values().forEach(ApexEventMarshaller::stop); + marshallerMap.clear(); } } diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java index fd6ac4489..fdc404c67 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019-2020 Nordix Foundation. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -347,17 +347,16 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable { // Order the stop stopOrderedFlag = true; - // Order a stop on the synchronous cache if one exists - if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS) - && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) { - ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop(); - } - // Wait for thread shutdown while (unmarshallerThread != null && unmarshallerThread.isAlive()) { ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL); } + // Order a stop on the synchronous cache if one exists + if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS) + && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) { + ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop(); + } LOGGER.exit("shut down Apex event unmarshaller"); } } diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java index 5707c95ad..65c2acffa 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modification Copyright (C) 2019-2021 Nordix Foundation. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,26 +24,20 @@ package org.onap.policy.apex.service.engine.main; import java.util.Arrays; import java.util.Base64; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Map.Entry; -import java.util.TreeMap; +import java.util.stream.Collectors; import lombok.Getter; import lombok.Setter; import org.onap.policy.apex.core.engine.EngineParameters; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; -import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; -import org.onap.policy.apex.model.basicmodel.service.ModelService; -import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum; -import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; -import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; +import org.onap.policy.apex.service.parameters.ApexParameterConstants; import org.onap.policy.apex.service.parameters.ApexParameterHandler; import org.onap.policy.apex.service.parameters.ApexParameters; +import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; import org.onap.policy.common.parameters.ParameterService; -import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; @@ -58,14 +52,12 @@ public class ApexMain { private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexMain.class); // The Apex Activator that activates the Apex engine + @Getter private ApexActivator activator; // The parameters read in from JSON for each policy @Getter - private Map apexParametersMap; - - //engineParameters are aggregated in case of multiple policies - private EngineParameters aggregatedEngineParameters; + private ApexParameters apexParameters; private ApexParameterHandler apexParameterHandler = new ApexParameterHandler(); @@ -77,73 +69,27 @@ public class ApexMain { * Instantiates the Apex service. * * @param args the command line arguments + * @throws ApexException the apex exception. */ - public ApexMain(final String[] args) { + public ApexMain(final String[] args) throws ApexException { LOGGER.entry("Starting Apex service with parameters " + Arrays.toString(args) + " . . ."); - apexParametersMap = new LinkedHashMap<>(); - aggregatedEngineParameters = new EngineParameters(); try { - apexParametersMap.put(new ToscaConceptIdentifier(), populateApexParameters(args)); + apexParameters = populateApexParameters(args); } catch (ApexException e) { LOGGER.error(APEX_SERVICE_FAILED_MSG, e); return; } - apexParameterHandler.registerParameters(apexParametersMap.values().iterator().next()); - // Now, create the activator for the Apex service - activator = new ApexActivator(apexParametersMap); - - // Start the activator - try { - activator.initialize(); - setAlive(true); - } catch (final ApexActivatorException e) { - LOGGER.error("start of Apex service failed, used parameters are " + Arrays.toString(args), e); - return; - } - - // Add a shutdown hook to shut everything down in an orderly manner - Runtime.getRuntime().addShutdownHook(new ApexMainShutdownHookClass()); - LOGGER.exit("Started Apex"); - } - - /** - * Instantiates the Apex service for multiple policies. - * - * @param policyArgumentsMap the map with command line arguments as value and policy-id as key - * @throws ApexException on errors - */ - public ApexMain(Map policyArgumentsMap) throws ApexException { - apexParametersMap = new LinkedHashMap<>(); - aggregatedEngineParameters = new EngineParameters(); - for (Entry policyArgsEntry: policyArgumentsMap.entrySet()) { - try { - apexParametersMap.put(policyArgsEntry.getKey(), populateApexParameters(policyArgsEntry.getValue())); - } catch (ApexException e) { - LOGGER.error("Invalid arguments specified for policy - " + policyArgsEntry.getKey().getName() + ":" - + policyArgsEntry.getKey().getVersion(), e); - } - } - if (apexParametersMap.isEmpty()) { - LOGGER.error(APEX_SERVICE_FAILED_MSG); - return; - } + aggregateParametersAndRegister(); - // Set the aggregated engineParameters which will be used later while creating the engine - ApexParameters apexParameters = apexParametersMap.values().iterator().next(); - apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters); - apexParameterHandler.registerParameters(apexParameters); // Now, create the activator for the Apex service - activator = new ApexActivator(apexParametersMap); + activator = new ApexActivator(apexParameters); // Start the activator try { activator.initialize(); - apexParametersMap = activator.getApexParametersMap(); setAlive(true); } catch (final ApexActivatorException e) { - LOGGER.error(APEX_SERVICE_FAILED_MSG, e); - activator.terminate(); - return; + throw new ApexException("start of Apex service failed, used parameters are " + Arrays.toString(args), e); } // Add a shutdown hook to shut everything down in an orderly manner @@ -151,75 +97,6 @@ public class ApexMain { LOGGER.exit("Started Apex"); } - /** - * Updates the APEX Engine with the model created from new Policies. - * - * @param policyArgsMap the map with command line arguments as value and policy-id as key - * @throws ApexException on errors - */ - public void updateModel(Map policyArgsMap) throws ApexException { - // flag that determines if any policy received in PDP_UPDATE is already deployed in the engine - boolean isAnyPolicyDeployed = policyArgsMap.keySet().stream().anyMatch(apexParametersMap::containsKey); - apexParametersMap.clear(); - aggregatedEngineParameters = new EngineParameters(); - AxContextAlbums albums = ModelService.getModel(AxContextAlbums.class); - Map albumsMap = new TreeMap<>(); - for (Entry policyArgsEntry : policyArgsMap.entrySet()) { - findAlbumsToHold(albumsMap, policyArgsEntry.getKey()); - try { - apexParametersMap.put(policyArgsEntry.getKey(), populateApexParameters(policyArgsEntry.getValue())); - } catch (ApexException e) { - LOGGER.error("Invalid arguments specified for policy - {}:{}", policyArgsEntry.getKey().getName(), - policyArgsEntry.getKey().getVersion(), e); - } - } - // Set the aggregated engineParameters - ApexParameters apexParameters = apexParametersMap.values().iterator().next(); - apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters); - ParameterService.clear(); - apexParameterHandler.registerParameters(apexParameters); - try { - if (albumsMap.isEmpty() && !isAnyPolicyDeployed) { - // clear context since none of the policies' context albums has to be retained - // this could be because all policies have a major version change, - // or a full set of new policies are received in the update message - activator.terminate(); - // ParameterService is cleared when activator is terminated. Register the parameters again in this case - apexParameterHandler.registerParameters(apexParameters); - activator = new ApexActivator(apexParametersMap); - activator.initialize(); - setAlive(true); - } else { - albums.setAlbumsMap(albumsMap); - activator.setApexParametersMap(apexParametersMap); - activator.updateModel(apexParametersMap); - } - } catch (final ApexException e) { - LOGGER.error(APEX_SERVICE_FAILED_MSG, e); - activator.terminate(); - throw new ApexException(e.getMessage()); - } - } - - /** - * Find the context albums which should be retained when updating the policies. - * - * @param albumsMap the albums which should be kept during model update - * @param policyId the policy id of current policy - */ - private void findAlbumsToHold(Map albumsMap, ToscaConceptIdentifier policyId) { - for (Entry policyModelsEntry : activator.getPolicyModelsMap() - .entrySet()) { - // If a policy with the same major version is received in PDP_UPDATE, - // context for that policy has to be retained. For this, take such policies' albums - if (policyModelsEntry.getKey().getName().equals(policyId.getName()) - && policyModelsEntry.getKey().getVersion().split("\\.")[0] - .equals(policyId.getVersion().split("\\.")[0])) { - albumsMap.putAll(policyModelsEntry.getValue().getAlbums().getAlbumsMap()); - } - } - } - private ApexParameters populateApexParameters(String[] args) throws ApexException { // Check the arguments final ApexCommandLineArguments arguments = new ApexCommandLineArguments(); @@ -263,11 +140,32 @@ public class ApexMain { ehParameterEntry.getValue().setName(ehParameterEntry.getKey()); } } - aggregateEngineParameters(axParameters.getEngineServiceParameters().getEngineParameters()); return axParameters; } - private void aggregateEngineParameters(EngineParameters engineParameters) { + private void aggregateParametersAndRegister() throws ApexException { + ApexParameters aggregatedParameters = null; + if (ParameterService.contains(ApexParameterConstants.MAIN_GROUP_NAME)) { + aggregatedParameters = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME); + } else { + aggregatedParameters = new ApexParameters(); + aggregatedParameters.setEngineServiceParameters(new EngineServiceParameters()); + apexParameterHandler.registerParameters(aggregatedParameters); + } + List duplicateInputParameters = aggregatedParameters.getEventInputParameters().keySet().stream() + .filter(apexParameters.getEventInputParameters()::containsKey).collect(Collectors.toList()); + List duplicateOutputParameters = aggregatedParameters.getEventOutputParameters().keySet().stream() + .filter(apexParameters.getEventOutputParameters()::containsKey).collect(Collectors.toList()); + if (!(duplicateInputParameters.isEmpty() && duplicateOutputParameters.isEmpty())) { + throw new ApexException( + "start of Apex service failed because this policy has the following duplicate I/O parameters: " + + duplicateInputParameters + "/" + duplicateOutputParameters); + } + aggregatedParameters.getEventInputParameters().putAll(apexParameters.getEventInputParameters()); + aggregatedParameters.getEventOutputParameters().putAll(apexParameters.getEventOutputParameters()); + EngineParameters aggregatedEngineParameters = + aggregatedParameters.getEngineServiceParameters().getEngineParameters(); + EngineParameters engineParameters = apexParameters.getEngineServiceParameters().getEngineParameters(); aggregatedEngineParameters.getTaskParameters().addAll(engineParameters.getTaskParameters()); aggregatedEngineParameters.getExecutorParameterMap().putAll(engineParameters.getExecutorParameterMap()); aggregatedEngineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap() @@ -346,8 +244,9 @@ public class ApexMain { * The main method. * * @param args the arguments + * @throws ApexException the apex exception. */ - public static void main(final String[] args) { + public static void main(final String[] args) throws ApexException { new ApexMain(args); } } diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java index 9a07b4ad5..673d9cab5 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019 Nordix Foundation. - * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,8 +73,6 @@ public class ApexParameterHandler { */ public ApexParameters getParameters(final ApexCommandLineArguments arguments) throws ParameterException { - ParameterService.clear(); - ApexParameters parameters = null; String toscaPolicyFilePath = arguments.getToscaPolicyFilePath(); // Read the parameters -- cgit 1.2.3-korg