diff options
author | a.sreekumar <ajith.sreekumar@bell.ca> | 2020-07-23 10:36:32 +0100 |
---|---|---|
committer | a.sreekumar <ajith.sreekumar@bell.ca> | 2020-07-24 17:13:47 +0100 |
commit | f68190af4f0251934734841352a77758ba53653e (patch) | |
tree | 27dcbdf0399740f307fee1fc0937afb18b52dd41 /services/services-engine/src | |
parent | 2169b3fbf66153ea7e066ba0057585822208d6f4 (diff) |
APEX changes to support policy disable/enable and some improvements
1) Do not stop all the marshallers/unmarshallers while updating the
engine. Stop and remove only those that are part of the policies that
are undeployed.
2) Do not reinitilaize any marshaller/unmarshaller that is already
initilaized as part of the policies which were already deployed.
Initialize only the ones as part of any newly deployed policy.
3) EngineParameters could be different in different policies. Aggregate
these parameters and make it available in the engine for any running policy.
4) Enable support for running policies with same model and different
configurations.
Change-Id: If74807a0515a741ef4e53bd0a93e43b05872f6b5
Issue-ID: POLICY-2536
Signed-off-by: a.sreekumar <ajith.sreekumar@bell.ca>
Diffstat (limited to 'services/services-engine/src')
3 files changed, 74 insertions, 42 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java index 4d84fa394..1e2447b2e 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java @@ -36,6 +36,7 @@ import lombok.Getter; import lombok.Setter; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; import org.onap.policy.apex.model.basicmodel.handling.ApexModelException; import org.onap.policy.apex.model.basicmodel.service.ModelService; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; @@ -88,6 +89,7 @@ public class ApexActivator { // The engine service private EngineService apexEngineService; + private AxArtifactKey engineKey; /** * Instantiate the activator for the Apex engine as a complete service. @@ -112,8 +114,9 @@ public class ApexActivator { int totalInstanceCount = apexParametersMap.values().stream() .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum(); apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount); + engineKey = apexParameters.getEngineServiceParameters().getEngineKey(); instantiateEngine(apexParameters); - setUpModelMarhsallerAndUnmarshaller(apexParameters); + setUpModelMarshallerAndUnmarshaller(apexParameters); } catch (final Exception e) { LOGGER.debug(APEX_ENGINE_FAILED_MSG, e); throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e); @@ -122,7 +125,7 @@ public class ApexActivator { LOGGER.debug("Apex engine started as a service"); } - private void setUpModelMarhsallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException { + private void setUpModelMarshallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException { policyModelsMap = new LinkedHashMap<>(); Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>(); Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>(); @@ -149,8 +152,7 @@ public class ApexActivator { try { final String policyModelString = TextFileUtils .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName()); - AxPolicyModel policyModel = EngineServiceImpl - .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString); + AxPolicyModel policyModel = EngineServiceImpl.createModel(engineKey, policyModelString); policyModelsMap.put(apexParamsEntry.getKey(), policyModel); } catch (ApexException | IOException e) { throw new ApexRuntimeException("Failed to create the apex model.", e); @@ -160,10 +162,10 @@ public class ApexActivator { AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap); // Set the policy model in the engine - apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel, - true); + apexEngineService.updateModel(engineKey, finalPolicyModel, true); - setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap, + handleExistingMarshallerAndUnmarshaller(inputParametersMap, outputParametersMap); + setUpNewMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap, outputParametersMap); // Wire up pairings between marhsallers and unmarshallers @@ -197,9 +199,10 @@ public class ApexActivator { return finalPolicyModel; } - private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters, + private void setUpNewMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters, Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap) throws ApexEventException { + // Producer parameters specify what event marshalers to handle events leaving Apex are // set up and how they are set up for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) { @@ -220,9 +223,25 @@ public class ApexActivator { } } + private void handleExistingMarshallerAndUnmarshaller(Map<String, EventHandlerParameters> inputParametersMap, + Map<String, EventHandlerParameters> outputParametersMap) { + // stop and remove any marshaller/unmarshaller that is part of a policy that is undeployed + marshallerMap.entrySet().stream() + .filter(marshallerEntry -> !outputParametersMap.containsKey(marshallerEntry.getKey())) + .forEach(marshallerEntry -> marshallerEntry.getValue().stop()); + marshallerMap.keySet().removeIf(marshallerKey -> !outputParametersMap.containsKey(marshallerKey)); + unmarshallerMap.entrySet().stream() + .filter(unmarshallerEntry -> !inputParametersMap.containsKey(unmarshallerEntry.getKey())) + .forEach(unmarshallerEntry -> unmarshallerEntry.getValue().stop()); + unmarshallerMap.keySet().removeIf(unmarshallerKey -> !inputParametersMap.containsKey(unmarshallerKey)); + + // If a marshaller/unmarshaller is already initialized, they don't need to be reinitialized during model update. + outputParametersMap.keySet().removeIf(marshallerMap::containsKey); + inputParametersMap.keySet().removeIf(unmarshallerMap::containsKey); + } + private void instantiateEngine(ApexParameters apexParameters) throws ApexException { - if (null != apexEngineService - && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) { + if (null != apexEngineService && apexEngineService.getKey().equals(engineKey)) { throw new ApexException("Apex Engine already initialized."); } // Create engine with specified thread count @@ -284,9 +303,8 @@ public class ApexActivator { */ public void updateModel(Map<ToscaPolicyIdentifier, ApexParameters> apexParamsMap) throws ApexException { try { - shutdownMarshallerAndUnmarshaller(); ApexParameters apexParameters = apexParamsMap.values().iterator().next(); - setUpModelMarhsallerAndUnmarshaller(apexParameters); + setUpModelMarshallerAndUnmarshaller(apexParameters); } catch (final Exception e) { LOGGER.debug(APEX_ENGINE_FAILED_MSG, e); throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e); diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java index 0e38230be..459cdff24 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modification Copyright (C) 2019-2020 Nordix Foundation. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +31,7 @@ import java.util.Map.Entry; import java.util.TreeMap; import lombok.Getter; import lombok.Setter; +import org.onap.policy.apex.core.engine.EngineParameters; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; import org.onap.policy.apex.model.basicmodel.service.ModelService; @@ -40,6 +42,7 @@ import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; import org.onap.policy.apex.service.parameters.ApexParameterHandler; import org.onap.policy.apex.service.parameters.ApexParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.common.parameters.ParameterService; import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; @@ -61,6 +64,11 @@ public class ApexMain { @Getter private Map<ToscaPolicyIdentifier, ApexParameters> apexParametersMap; + //engineParameters are aggregated in case of multiple policies + private EngineParameters aggregatedEngineParameters; + + private ApexParameterHandler apexParameterHandler = new ApexParameterHandler(); + @Getter @Setter(lombok.AccessLevel.PRIVATE) private volatile boolean alive = false; @@ -73,13 +81,14 @@ public class ApexMain { public ApexMain(final String[] args) { LOGGER.entry("Starting Apex service with parameters " + Arrays.toString(args) + " . . ."); apexParametersMap = new LinkedHashMap<>(); + aggregatedEngineParameters = new EngineParameters(); try { apexParametersMap.put(new ToscaPolicyIdentifier(), populateApexParameters(args)); } catch (ApexException e) { LOGGER.error(APEX_SERVICE_FAILED_MSG, e); return; } - + apexParameterHandler.registerParameters(apexParametersMap.values().iterator().next()); // Now, create the activator for the Apex service activator = new ApexActivator(apexParametersMap); @@ -105,6 +114,7 @@ public class ApexMain { */ public ApexMain(Map<ToscaPolicyIdentifier, String[]> policyArgumentsMap) throws ApexException { apexParametersMap = new LinkedHashMap<>(); + aggregatedEngineParameters = new EngineParameters(); for (Entry<ToscaPolicyIdentifier, String[]> policyArgsEntry: policyArgumentsMap.entrySet()) { try { apexParametersMap.put(policyArgsEntry.getKey(), populateApexParameters(policyArgsEntry.getValue())); @@ -117,6 +127,11 @@ public class ApexMain { LOGGER.error(APEX_SERVICE_FAILED_MSG); return; } + + // Set the aggregated engineParameters which will be used later while creating the engine + ApexParameters apexParameters = apexParametersMap.values().iterator().next(); + apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters); + apexParameterHandler.registerParameters(apexParameters); // Now, create the activator for the Apex service activator = new ApexActivator(apexParametersMap); @@ -143,7 +158,10 @@ public class ApexMain { * @throws ApexException on errors */ public void updateModel(Map<ToscaPolicyIdentifier, String[]> policyArgsMap) throws ApexException { + // flag that determines if any policy received in PDP_UPDATE is already deployed in the engine + boolean isAnyPolicyDeployed = policyArgsMap.keySet().stream().anyMatch(apexParametersMap::containsKey); apexParametersMap.clear(); + aggregatedEngineParameters = new EngineParameters(); AxContextAlbums albums = ModelService.getModel(AxContextAlbums.class); Map<AxArtifactKey, AxContextAlbum> albumsMap = new TreeMap<>(); for (Entry<ToscaPolicyIdentifier, String[]> policyArgsEntry : policyArgsMap.entrySet()) { @@ -155,14 +173,19 @@ public class ApexMain { policyArgsEntry.getKey().getVersion(), e); } } + // Set the aggregated engineParameters + ApexParameters apexParameters = apexParametersMap.values().iterator().next(); + apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters); + ParameterService.clear(); + apexParameterHandler.registerParameters(apexParameters); try { - if (albumsMap.isEmpty()) { + if (albumsMap.isEmpty() && !isAnyPolicyDeployed) { // clear context since none of the policies' context albums has to be retained // this could be because all policies have a major version change, // or a full set of new policies are received in the update message activator.terminate(); - // ParameterService is cleared when activator is terminated. Register the engine parameters in this case - new ApexParameterHandler().registerParameters(apexParametersMap.values().iterator().next()); + // ParameterService is cleared when activator is terminated. Register the parameters again in this case + apexParameterHandler.registerParameters(apexParameters); activator = new ApexActivator(apexParametersMap); activator.initialize(); setAlive(true); @@ -218,10 +241,6 @@ public class ApexMain { ApexParameters axParameters; // Read the parameters try { - ApexParameterHandler apexParameterHandler = new ApexParameterHandler(); - // In case of multiple policies received from PAP, do not clear ParameterService if parameters of one policy - // already registered - apexParameterHandler.setKeepParameterServiceFlag(null != apexParametersMap && !apexParametersMap.isEmpty()); axParameters = apexParameterHandler.getParameters(arguments); } catch (final Exception e) { LOGGER.error("Cannot create APEX Parameters from the arguments provided.", e); @@ -244,9 +263,17 @@ public class ApexMain { ehParameterEntry.getValue().setName(ehParameterEntry.getKey()); } } + aggregateEngineParameters(axParameters.getEngineServiceParameters().getEngineParameters()); return axParameters; } + private void aggregateEngineParameters(EngineParameters engineParameters) { + aggregatedEngineParameters.getTaskParameters().addAll(engineParameters.getTaskParameters()); + aggregatedEngineParameters.getExecutorParameterMap().putAll(engineParameters.getExecutorParameterMap()); + aggregatedEngineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap() + .putAll(engineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap()); + } + /** * Shut down Execution. * diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java index 02d39733c..f88733f60 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java @@ -2,19 +2,20 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + * * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ @@ -24,7 +25,6 @@ package org.onap.policy.apex.service.parameters; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import java.io.FileReader; -import lombok.Setter; import org.onap.policy.apex.core.engine.EngineParameters; import org.onap.policy.apex.service.engine.main.ApexCommandLineArguments; import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; @@ -46,9 +46,6 @@ import org.slf4j.ext.XLoggerFactory; public class ApexParameterHandler { private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexParameterHandler.class); - @Setter - private boolean keepParameterServiceFlag; - /** * Read the parameters from the parameter file. * @@ -57,11 +54,8 @@ public class ApexParameterHandler { * @throws ParameterException on parameter exceptions */ public ApexParameters getParameters(final ApexCommandLineArguments arguments) throws ParameterException { - // when populating parameters for multiple policies, do not clear the ParameterService already registered - // otherwise clear all existing parameters - if (!keepParameterServiceFlag) { - ParameterService.clear(); - } + + ParameterService.clear(); ApexParameters parameters = null; @@ -70,11 +64,11 @@ public class ApexParameterHandler { // Register the adapters for our carrier technologies and event protocols with GSON // @formatter:off final Gson gson = new GsonBuilder() - .registerTypeAdapter(EngineParameters.class, + .registerTypeAdapter(EngineParameters.class, new EngineServiceParametersJsonAdapter()) - .registerTypeAdapter(CarrierTechnologyParameters.class, + .registerTypeAdapter(CarrierTechnologyParameters.class, new CarrierTechnologyParametersJsonAdapter()) - .registerTypeAdapter(EventProtocolParameters.class, + .registerTypeAdapter(EventProtocolParameters.class, new EventProtocolParametersJsonAdapter()) .create(); // @formatter:on @@ -118,13 +112,6 @@ public class ApexParameterHandler { LOGGER.info(returnMessage); } - // engine parameters in multiple policies are expected to be same. - // no need to do registration if already registered - if (!keepParameterServiceFlag) { - // Register the parameters with the parameter service - registerParameters(parameters); - } - return parameters; } |