diff options
Diffstat (limited to 'services/services-engine/src/main/java/org')
3 files changed, 74 insertions, 42 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java index 4d84fa394..1e2447b2e 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java @@ -36,6 +36,7 @@ import lombok.Getter; import lombok.Setter; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; import org.onap.policy.apex.model.basicmodel.handling.ApexModelException; import org.onap.policy.apex.model.basicmodel.service.ModelService; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; @@ -88,6 +89,7 @@ public class ApexActivator { // The engine service private EngineService apexEngineService; + private AxArtifactKey engineKey; /** * Instantiate the activator for the Apex engine as a complete service. @@ -112,8 +114,9 @@ public class ApexActivator { int totalInstanceCount = apexParametersMap.values().stream() .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum(); apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount); + engineKey = apexParameters.getEngineServiceParameters().getEngineKey(); instantiateEngine(apexParameters); - setUpModelMarhsallerAndUnmarshaller(apexParameters); + setUpModelMarshallerAndUnmarshaller(apexParameters); } catch (final Exception e) { LOGGER.debug(APEX_ENGINE_FAILED_MSG, e); throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e); @@ -122,7 +125,7 @@ public class ApexActivator { LOGGER.debug("Apex engine started as a service"); } - private void setUpModelMarhsallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException { + private void setUpModelMarshallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException { policyModelsMap = new LinkedHashMap<>(); Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>(); Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>(); @@ -149,8 +152,7 @@ public class ApexActivator { try { final String policyModelString = TextFileUtils .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName()); - AxPolicyModel policyModel = EngineServiceImpl - .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString); + AxPolicyModel policyModel = EngineServiceImpl.createModel(engineKey, policyModelString); policyModelsMap.put(apexParamsEntry.getKey(), policyModel); } catch (ApexException | IOException e) { throw new ApexRuntimeException("Failed to create the apex model.", e); @@ -160,10 +162,10 @@ public class ApexActivator { AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap); // Set the policy model in the engine - apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel, - true); + apexEngineService.updateModel(engineKey, finalPolicyModel, true); - setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap, + handleExistingMarshallerAndUnmarshaller(inputParametersMap, outputParametersMap); + setUpNewMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap, outputParametersMap); // Wire up pairings between marhsallers and unmarshallers @@ -197,9 +199,10 @@ public class ApexActivator { return finalPolicyModel; } - private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters, + private void setUpNewMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters, Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap) throws ApexEventException { + // Producer parameters specify what event marshalers to handle events leaving Apex are // set up and how they are set up for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) { @@ -220,9 +223,25 @@ public class ApexActivator { } } + private void handleExistingMarshallerAndUnmarshaller(Map<String, EventHandlerParameters> inputParametersMap, + Map<String, EventHandlerParameters> outputParametersMap) { + // stop and remove any marshaller/unmarshaller that is part of a policy that is undeployed + marshallerMap.entrySet().stream() + .filter(marshallerEntry -> !outputParametersMap.containsKey(marshallerEntry.getKey())) + .forEach(marshallerEntry -> marshallerEntry.getValue().stop()); + marshallerMap.keySet().removeIf(marshallerKey -> !outputParametersMap.containsKey(marshallerKey)); + unmarshallerMap.entrySet().stream() + .filter(unmarshallerEntry -> !inputParametersMap.containsKey(unmarshallerEntry.getKey())) + .forEach(unmarshallerEntry -> unmarshallerEntry.getValue().stop()); + unmarshallerMap.keySet().removeIf(unmarshallerKey -> !inputParametersMap.containsKey(unmarshallerKey)); + + // If a marshaller/unmarshaller is already initialized, they don't need to be reinitialized during model update. + outputParametersMap.keySet().removeIf(marshallerMap::containsKey); + inputParametersMap.keySet().removeIf(unmarshallerMap::containsKey); + } + private void instantiateEngine(ApexParameters apexParameters) throws ApexException { - if (null != apexEngineService - && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) { + if (null != apexEngineService && apexEngineService.getKey().equals(engineKey)) { throw new ApexException("Apex Engine already initialized."); } // Create engine with specified thread count @@ -284,9 +303,8 @@ public class ApexActivator { */ public void updateModel(Map<ToscaPolicyIdentifier, ApexParameters> apexParamsMap) throws ApexException { try { - shutdownMarshallerAndUnmarshaller(); ApexParameters apexParameters = apexParamsMap.values().iterator().next(); - setUpModelMarhsallerAndUnmarshaller(apexParameters); + setUpModelMarshallerAndUnmarshaller(apexParameters); } catch (final Exception e) { LOGGER.debug(APEX_ENGINE_FAILED_MSG, e); throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e); diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java index 0e38230be..459cdff24 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modification Copyright (C) 2019-2020 Nordix Foundation. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +31,7 @@ import java.util.Map.Entry; import java.util.TreeMap; import lombok.Getter; import lombok.Setter; +import org.onap.policy.apex.core.engine.EngineParameters; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; import org.onap.policy.apex.model.basicmodel.service.ModelService; @@ -40,6 +42,7 @@ import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; import org.onap.policy.apex.service.parameters.ApexParameterHandler; import org.onap.policy.apex.service.parameters.ApexParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.common.parameters.ParameterService; import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; @@ -61,6 +64,11 @@ public class ApexMain { @Getter private Map<ToscaPolicyIdentifier, ApexParameters> apexParametersMap; + //engineParameters are aggregated in case of multiple policies + private EngineParameters aggregatedEngineParameters; + + private ApexParameterHandler apexParameterHandler = new ApexParameterHandler(); + @Getter @Setter(lombok.AccessLevel.PRIVATE) private volatile boolean alive = false; @@ -73,13 +81,14 @@ public class ApexMain { public ApexMain(final String[] args) { LOGGER.entry("Starting Apex service with parameters " + Arrays.toString(args) + " . . ."); apexParametersMap = new LinkedHashMap<>(); + aggregatedEngineParameters = new EngineParameters(); try { apexParametersMap.put(new ToscaPolicyIdentifier(), populateApexParameters(args)); } catch (ApexException e) { LOGGER.error(APEX_SERVICE_FAILED_MSG, e); return; } - + apexParameterHandler.registerParameters(apexParametersMap.values().iterator().next()); // Now, create the activator for the Apex service activator = new ApexActivator(apexParametersMap); @@ -105,6 +114,7 @@ public class ApexMain { */ public ApexMain(Map<ToscaPolicyIdentifier, String[]> policyArgumentsMap) throws ApexException { apexParametersMap = new LinkedHashMap<>(); + aggregatedEngineParameters = new EngineParameters(); for (Entry<ToscaPolicyIdentifier, String[]> policyArgsEntry: policyArgumentsMap.entrySet()) { try { apexParametersMap.put(policyArgsEntry.getKey(), populateApexParameters(policyArgsEntry.getValue())); @@ -117,6 +127,11 @@ public class ApexMain { LOGGER.error(APEX_SERVICE_FAILED_MSG); return; } + + // Set the aggregated engineParameters which will be used later while creating the engine + ApexParameters apexParameters = apexParametersMap.values().iterator().next(); + apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters); + apexParameterHandler.registerParameters(apexParameters); // Now, create the activator for the Apex service activator = new ApexActivator(apexParametersMap); @@ -143,7 +158,10 @@ public class ApexMain { * @throws ApexException on errors */ public void updateModel(Map<ToscaPolicyIdentifier, String[]> policyArgsMap) throws ApexException { + // flag that determines if any policy received in PDP_UPDATE is already deployed in the engine + boolean isAnyPolicyDeployed = policyArgsMap.keySet().stream().anyMatch(apexParametersMap::containsKey); apexParametersMap.clear(); + aggregatedEngineParameters = new EngineParameters(); AxContextAlbums albums = ModelService.getModel(AxContextAlbums.class); Map<AxArtifactKey, AxContextAlbum> albumsMap = new TreeMap<>(); for (Entry<ToscaPolicyIdentifier, String[]> policyArgsEntry : policyArgsMap.entrySet()) { @@ -155,14 +173,19 @@ public class ApexMain { policyArgsEntry.getKey().getVersion(), e); } } + // Set the aggregated engineParameters + ApexParameters apexParameters = apexParametersMap.values().iterator().next(); + apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters); + ParameterService.clear(); + apexParameterHandler.registerParameters(apexParameters); try { - if (albumsMap.isEmpty()) { + if (albumsMap.isEmpty() && !isAnyPolicyDeployed) { // clear context since none of the policies' context albums has to be retained // this could be because all policies have a major version change, // or a full set of new policies are received in the update message activator.terminate(); - // ParameterService is cleared when activator is terminated. Register the engine parameters in this case - new ApexParameterHandler().registerParameters(apexParametersMap.values().iterator().next()); + // ParameterService is cleared when activator is terminated. Register the parameters again in this case + apexParameterHandler.registerParameters(apexParameters); activator = new ApexActivator(apexParametersMap); activator.initialize(); setAlive(true); @@ -218,10 +241,6 @@ public class ApexMain { ApexParameters axParameters; // Read the parameters try { - ApexParameterHandler apexParameterHandler = new ApexParameterHandler(); - // In case of multiple policies received from PAP, do not clear ParameterService if parameters of one policy - // already registered - apexParameterHandler.setKeepParameterServiceFlag(null != apexParametersMap && !apexParametersMap.isEmpty()); axParameters = apexParameterHandler.getParameters(arguments); } catch (final Exception e) { LOGGER.error("Cannot create APEX Parameters from the arguments provided.", e); @@ -244,9 +263,17 @@ public class ApexMain { ehParameterEntry.getValue().setName(ehParameterEntry.getKey()); } } + aggregateEngineParameters(axParameters.getEngineServiceParameters().getEngineParameters()); return axParameters; } + private void aggregateEngineParameters(EngineParameters engineParameters) { + aggregatedEngineParameters.getTaskParameters().addAll(engineParameters.getTaskParameters()); + aggregatedEngineParameters.getExecutorParameterMap().putAll(engineParameters.getExecutorParameterMap()); + aggregatedEngineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap() + .putAll(engineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap()); + } + /** * Shut down Execution. * diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java index 02d39733c..f88733f60 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java @@ -2,19 +2,20 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + * * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ @@ -24,7 +25,6 @@ package org.onap.policy.apex.service.parameters; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import java.io.FileReader; -import lombok.Setter; import org.onap.policy.apex.core.engine.EngineParameters; import org.onap.policy.apex.service.engine.main.ApexCommandLineArguments; import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; @@ -46,9 +46,6 @@ import org.slf4j.ext.XLoggerFactory; public class ApexParameterHandler { private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexParameterHandler.class); - @Setter - private boolean keepParameterServiceFlag; - /** * Read the parameters from the parameter file. * @@ -57,11 +54,8 @@ public class ApexParameterHandler { * @throws ParameterException on parameter exceptions */ public ApexParameters getParameters(final ApexCommandLineArguments arguments) throws ParameterException { - // when populating parameters for multiple policies, do not clear the ParameterService already registered - // otherwise clear all existing parameters - if (!keepParameterServiceFlag) { - ParameterService.clear(); - } + + ParameterService.clear(); ApexParameters parameters = null; @@ -70,11 +64,11 @@ public class ApexParameterHandler { // Register the adapters for our carrier technologies and event protocols with GSON // @formatter:off final Gson gson = new GsonBuilder() - .registerTypeAdapter(EngineParameters.class, + .registerTypeAdapter(EngineParameters.class, new EngineServiceParametersJsonAdapter()) - .registerTypeAdapter(CarrierTechnologyParameters.class, + .registerTypeAdapter(CarrierTechnologyParameters.class, new CarrierTechnologyParametersJsonAdapter()) - .registerTypeAdapter(EventProtocolParameters.class, + .registerTypeAdapter(EventProtocolParameters.class, new EventProtocolParametersJsonAdapter()) .create(); // @formatter:on @@ -118,13 +112,6 @@ public class ApexParameterHandler { LOGGER.info(returnMessage); } - // engine parameters in multiple policies are expected to be same. - // no need to do registration if already registered - if (!keepParameterServiceFlag) { - // Register the parameters with the parameter service - registerParameters(parameters); - } - return parameters; } |