From 40a1f22ff8d28e78b6512c0a10d454b37f015fdb Mon Sep 17 00:00:00 2001 From: "a.sreekumar" Date: Tue, 5 Nov 2019 14:37:09 +0000 Subject: Retaining context in APEX Engine based on policies received in PdpUpdate Change-Id: I73fad5bf76ed6b4979f5ab76013f204ea82da30b Issue-ID: POLICY-2215 Signed-off-by: a.sreekumar --- .../apex/service/engine/main/ApexActivator.java | 137 +++++++++++++-------- .../policy/apex/service/engine/main/ApexMain.java | 67 ++++++++++ .../engine/runtime/impl/EngineServiceImpl.java | 20 ++- .../service/engine/runtime/impl/EngineWorker.java | 17 +-- .../service/parameters/ApexParameterHandler.java | 4 +- 5 files changed, 176 insertions(+), 69 deletions(-) (limited to 'services/services-engine') diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java index 2c3fac151..6c86c1eff 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java @@ -21,11 +21,13 @@ package org.onap.policy.apex.service.engine.main; +import java.io.IOException; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; import java.util.stream.Stream; import lombok.Getter; +import lombok.Setter; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.model.basicmodel.handling.ApexModelException; import org.onap.policy.apex.model.basicmodel.service.ModelService; @@ -59,8 +61,12 @@ public class ApexActivator { // The parameters of the Apex activator when running with multiple policies @Getter + @Setter private Map apexParametersMap; + @Getter + Map policyModelsMap; + // Event unmarshalers are used to receive events asynchronously into Apex private final Map unmarshallerMap = new LinkedHashMap<>(); @@ -99,46 +105,7 @@ public class ApexActivator { .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum(); apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount); instantiateEngine(apexParameters); - Map policyModelsMap = new LinkedHashMap<>(); - Map inputParametersMap = new LinkedHashMap<>(); - Map outputParametersMap = new LinkedHashMap<>(); - - for (Entry apexParamsEntry : apexParametersMap.entrySet()) { - ApexParameters apexParams = apexParamsEntry.getValue(); - boolean duplicateInputParameterExist = - apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey); - boolean duplicateOutputParameterExist = - apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey); - if (duplicateInputParameterExist || duplicateOutputParameterExist) { - LOGGER.error("I/O Parameters for " + apexParamsEntry.getKey().getName() + ":" - + apexParamsEntry.getKey().getVersion() - + " has duplicates. So this policy is not executed"); - apexParametersMap.remove(apexParamsEntry.getKey()); - continue; - } else { - inputParametersMap.putAll(apexParams.getEventInputParameters()); - outputParametersMap.putAll(apexParams.getEventOutputParameters()); - } - // Check if a policy model file has been specified - if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) { - LOGGER.debug("deploying policy model in \"" - + apexParams.getEngineServiceParameters().getPolicyModelFileName() - + "\" to the apex engines . . ."); - - final String policyModelString = TextFileUtils - .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName()); - AxPolicyModel policyModel = EngineServiceImpl - .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString); - policyModelsMap.put(apexParamsEntry.getKey(), policyModel); - } - } - AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap); - // Set the policy model in the engine - apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), - finalPolicyModel, true); - setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap, - outputParametersMap); - setUpmarshalerPairings(inputParametersMap); + setUpModelMarhsallerAndUnmarshaller(apexParameters); } catch (final Exception e) { LOGGER.debug(APEX_ENGINE_FAILED_MSG, e); throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e); @@ -147,8 +114,50 @@ public class ApexActivator { LOGGER.debug("Apex engine started as a service"); } + private void setUpModelMarhsallerAndUnmarshaller(ApexParameters apexParameters) throws IOException, ApexException { + policyModelsMap = new LinkedHashMap<>(); + Map inputParametersMap = new LinkedHashMap<>(); + Map outputParametersMap = new LinkedHashMap<>(); + + for (Entry apexParamsEntry : apexParametersMap.entrySet()) { + ApexParameters apexParams = apexParamsEntry.getValue(); + boolean duplicateInputParameterExist = + apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey); + boolean duplicateOutputParameterExist = + apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey); + if (duplicateInputParameterExist || duplicateOutputParameterExist) { + LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.", + apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion()); + apexParametersMap.remove(apexParamsEntry.getKey()); + continue; + } + inputParametersMap.putAll(apexParams.getEventInputParameters()); + outputParametersMap.putAll(apexParams.getEventOutputParameters()); + // Check if a policy model file has been specified + if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) { + LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .", + apexParams.getEngineServiceParameters().getPolicyModelFileName()); + + final String policyModelString = + TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName()); + AxPolicyModel policyModel = EngineServiceImpl + .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString); + policyModelsMap.put(apexParamsEntry.getKey(), policyModel); + } + } + AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap); + // Set the policy model in the engine + apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel, + true); + setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap, + outputParametersMap); + setUpMarshalerPairings(inputParametersMap); + } + private AxPolicyModel aggregatePolicyModels(Map policyModelsMap) { Map.Entry firstEntry = policyModelsMap.entrySet().iterator().next(); + ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey()); + AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue()); Stream> policyModelStream = policyModelsMap.entrySet().stream().skip(1); Entry finalPolicyModelEntry = @@ -157,13 +166,16 @@ public class ApexActivator { entry1.setValue( PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true)); } catch (ApexModelException exc) { - LOGGER.error("Policy model for " + entry2.getKey().getName() + ":" + entry2.getKey().getVersion() - + " is having duplicates. So this policy is not executed", exc.getMessage()); + LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.", + entry2.getKey().getName(), entry2.getKey().getVersion(), exc); apexParametersMap.remove(entry2.getKey()); + policyModelsMap.remove(entry2.getKey()); } return entry1; })); - return finalPolicyModelEntry.getValue(); + AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue()); + policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap + return finalPolicyModel; } private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters, @@ -215,7 +227,7 @@ public class ApexActivator { * paired marshaler * @param inputParametersMap the apex parameters */ - private void setUpmarshalerPairings(Map inputParametersMap) { + private void setUpMarshalerPairings(Map inputParametersMap) { for (final Entry inputParameters : inputParametersMap.entrySet()) { final ApexEventUnmarshaller unmarshaller = unmarshallerMap.get(inputParameters.getKey()); @@ -236,6 +248,23 @@ public class ApexActivator { } } + /** + * Updates the APEX Engine with the model created from new Policies. + * + * @param apexParamsMap the apex parameters map for the Apex service + * @throws ApexException on errors + */ + public void updateModel(Map apexParamsMap) throws ApexException { + try { + shutdownMarshallerAndUnmarshaller(); + ApexParameters apexParameters = apexParamsMap.values().iterator().next(); + setUpModelMarhsallerAndUnmarshaller(apexParameters); + } catch (final Exception e) { + LOGGER.debug(APEX_ENGINE_FAILED_MSG, e); + throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e); + } + } + /** * Terminate the Apex engine. * @@ -243,15 +272,7 @@ public class ApexActivator { */ public void terminate() throws ApexException { // Shut down all marshalers and unmarshalers - for (final ApexEventMarshaller marshaller : marshallerMap.values()) { - marshaller.stop(); - } - marshallerMap.clear(); - - for (final ApexEventUnmarshaller unmarshaller : unmarshallerMap.values()) { - unmarshaller.stop(); - } - unmarshallerMap.clear(); + shutdownMarshallerAndUnmarshaller(); // Check if the engine service handler has been shut down already if (engineServiceHandler != null) { @@ -263,4 +284,14 @@ public class ApexActivator { ModelService.clear(); ParameterService.clear(); } + + /** + * Shuts down all marshallers and unmarshallers. + */ + private void shutdownMarshallerAndUnmarshaller() { + marshallerMap.values().forEach(ApexEventMarshaller::stop); + marshallerMap.clear(); + unmarshallerMap.values().forEach(ApexEventUnmarshaller::stop); + unmarshallerMap.clear(); + } } diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java index 4600690e7..14b57b2d1 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java @@ -26,9 +26,15 @@ import java.util.Base64; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.TreeMap; import lombok.Getter; import lombok.Setter; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.onap.policy.apex.model.basicmodel.service.ModelService; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum; +import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums; +import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; import org.onap.policy.apex.service.parameters.ApexParameterHandler; import org.onap.policy.apex.service.parameters.ApexParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; @@ -128,6 +134,67 @@ public class ApexMain { LOGGER.exit("Started Apex"); } + /** + * Updates the APEX Engine with the model created from new Policies. + * + * @param policyArgsMap the map with command line arguments as value and policy-id as key + * @throws ApexException on errors + */ + public void updateModel(Map policyArgsMap) throws ApexException { + apexParametersMap.clear(); + AxContextAlbums albums = ModelService.getModel(AxContextAlbums.class); + Map albumsMap = new TreeMap<>(); + for (Entry policyArgsEntry : policyArgsMap.entrySet()) { + findAlbumsToHold(albumsMap, policyArgsEntry.getKey()); + try { + apexParametersMap.put(policyArgsEntry.getKey(), populateApexParameters(policyArgsEntry.getValue())); + } catch (ApexException e) { + LOGGER.error("Invalid arguments specified for policy - {}:{}", policyArgsEntry.getKey().getName(), + policyArgsEntry.getKey().getVersion(), e); + } + } + try { + if (albumsMap.isEmpty()) { + // clear context since none of the policies' context albums has to be retained + // this could be because all policies have a major version change, + // or a full set of new policies are received in the update message + activator.terminate(); + // ParameterService is cleared when activator is terminated. Register the engine parameters in this case + new ApexParameterHandler().registerParameters(apexParametersMap.values().iterator().next()); + activator = new ApexActivator(apexParametersMap); + activator.initialize(); + setAlive(true); + } else { + albums.setAlbumsMap(albumsMap); + activator.setApexParametersMap(apexParametersMap); + activator.updateModel(apexParametersMap); + } + } catch (final ApexException e) { + LOGGER.error(APEX_SERVICE_FAILED_MSG, e); + activator.terminate(); + throw new ApexException(e.getMessage()); + } + } + + /** + * Find the context albums which should be retained when updating the policies. + * + * @param albumsMap the albums which should be kept during model update + * @param policyId the policy id of current policy + */ + private void findAlbumsToHold(Map albumsMap, ToscaPolicyIdentifier policyId) { + for (Entry policyModelsEntry : activator.getPolicyModelsMap() + .entrySet()) { + // If a policy with the same major version is received in PDP_UPDATE, + // context for that policy has to be retained. For this, take such policies' albums + if (policyModelsEntry.getKey().getName().equals(policyId.getName()) + && policyModelsEntry.getKey().getVersion().split("\\.")[0] + .equals(policyId.getVersion().split("\\.")[0])) { + albumsMap.putAll(policyModelsEntry.getValue().getAlbums().getAlbumsMap()); + } + } + } + private ApexParameters populateApexParameters(String[] args) throws ApexException { // Check the arguments final ApexCommandLineArguments arguments = new ApexCommandLineArguments(); diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java index 8247fd5f3..f5e36e864 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java @@ -77,8 +77,8 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven private AxArtifactKey engineServiceKey = null; // The Apex engine workers this engine service is handling - private final Map engineWorkerMap = Collections - .synchronizedMap(new LinkedHashMap()); + private final Map engineWorkerMap = Collections + .synchronizedMap(new LinkedHashMap()); // Event queue for events being sent into the Apex engines, it used by all engines within a // group. @@ -342,9 +342,17 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven } // Update the engines - for (final Entry engineWorkerEntry : engineWorkerMap.entrySet()) { + boolean isSubsequentInstance = false; + for (final Entry engineWorkerEntry : engineWorkerMap.entrySet()) { LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getId()); - engineWorkerEntry.getValue().updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag); + EngineWorker engineWorker = engineWorkerEntry.getValue(); + if (isSubsequentInstance) { + // set subsequentInstance flag as true if the current engine worker instance is not the first one + // first engine instance will have this flag as false + engineWorker.setSubsequentInstance(true); + } + engineWorker.updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag); + isSubsequentInstance = true; } // start all engines on this engine service if it was not stopped before the update @@ -355,7 +363,7 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven } // Check if all engines are running final StringBuilder notRunningEngineIdBuilder = new StringBuilder(); - for (final Entry engineWorkerEntry : engineWorkerMap.entrySet()) { + for (final Entry engineWorkerEntry : engineWorkerMap.entrySet()) { if (engineWorkerEntry.getValue().getState() != AxEngineState.READY && engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) { notRunningEngineIdBuilder.append(engineWorkerEntry.getKey().getId()); @@ -387,7 +395,7 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven } // Check if all engines are stopped final StringBuilder notStoppedEngineIdBuilder = new StringBuilder(); - for (final Entry engineWorkerEntry : engineWorkerMap.entrySet()) { + for (final Entry engineWorkerEntry : engineWorkerMap.entrySet()) { if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) { notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId()); notStoppedEngineIdBuilder.append('('); diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java index a24d9618a..e00515bd0 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java @@ -6,15 +6,15 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + * * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ @@ -25,7 +25,6 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; import com.google.gson.JsonParser; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.util.Arrays; @@ -33,7 +32,7 @@ import java.util.Collection; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; - +import lombok.Setter; import org.onap.policy.apex.context.ContextException; import org.onap.policy.apex.context.ContextRuntimeException; import org.onap.policy.apex.context.SchemaHelper; @@ -98,6 +97,9 @@ final class EngineWorker implements EngineService { // Converts ApexEvent instances to and from EnEvent instances private ApexEvent2EnEventConverter apexEnEventConverter = null; + @Setter + private boolean isSubsequentInstance; + /** * Constructor that creates an Apex engine, an event processor for events to be sent to that engine, and an * {@link ApexModelReader} instance to read Apex models using JAXB. @@ -236,9 +238,8 @@ final class EngineWorker implements EngineService { } } } - // Update the Apex model in the Apex engine - engine.updateModel(apexModel); + engine.updateModel(apexModel, isSubsequentInstance); LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey); LOGGER.exit(); @@ -613,7 +614,7 @@ final class EngineWorker implements EngineService { /** * Debug the event if debug is enabled. - * + * * @param event the event to debug */ private void debugEventIfDebugEnabled(ApexEvent event) { diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java index c1ef50bd7..553ed18c6 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java @@ -130,10 +130,10 @@ public class ApexParameterHandler { /** * Register all the incoming parameters with the parameter service. - * + * * @param parameters The parameters to register */ - private void registerParameters(ApexParameters parameters) { + public void registerParameters(ApexParameters parameters) { ParameterService.register(parameters); ParameterService.register(parameters.getEngineServiceParameters()); ParameterService.register(parameters.getEngineServiceParameters().getEngineParameters()); -- cgit 1.2.3-korg