summaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
authora.sreekumar <ajith.sreekumar@bell.ca>2021-01-25 11:03:42 +0000
committera.sreekumar <ajith.sreekumar@bell.ca>2021-01-28 13:14:51 +0000
commit658e67bc821a3bc55f2c6d877e7e0baa21427333 (patch)
tree5ab53c6026317858aec8b61bf0aec37459247cf5 /services
parent84f92b44e70ce27bb4213da677d50ac91169432c (diff)
Improve handling of multiple policy in APEX PDP
Change-Id: Ic4adf5bd8876dc31fc93993298e90389baaa2c39 Issue-ID: POLICY-2883 Signed-off-by: a.sreekumar <ajith.sreekumar@bell.ca>
Diffstat (limited to 'services')
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java278
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java13
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java175
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java4
-rw-r--r--services/services-engine/src/test/java/org/onap/policy/apex/service/engine/main/ApexMainTest.java75
-rw-r--r--services/services-engine/src/test/resources/parameters/correctParams.json3
-rw-r--r--services/services-engine/src/test/resources/parameters/correctParams2.json513
-rw-r--r--services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java203
8 files changed, 907 insertions, 357 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
index cadcc7ee8..3451c120c 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2019-2021 Nordix Foundation.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,35 +22,41 @@
package org.onap.policy.apex.service.engine.main;
-import java.util.AbstractMap;
-import java.util.ArrayList;
import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException;
import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
+import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInfo;
+import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInformation;
import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
import org.onap.policy.apex.model.basicmodel.service.ModelService;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchemas;
import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
+import org.onap.policy.apex.model.eventmodel.concepts.AxEvent;
+import org.onap.policy.apex.model.eventmodel.concepts.AxEvents;
+import org.onap.policy.apex.model.policymodel.concepts.AxPolicies;
+import org.onap.policy.apex.model.policymodel.concepts.AxPolicy;
import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
+import org.onap.policy.apex.model.policymodel.concepts.AxTask;
+import org.onap.policy.apex.model.policymodel.concepts.AxTasks;
import org.onap.policy.apex.model.policymodel.handling.PolicyModelMerger;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.runtime.EngineService;
import org.onap.policy.apex.service.engine.runtime.impl.EngineServiceImpl;
+import org.onap.policy.apex.service.parameters.ApexParameterConstants;
import org.onap.policy.apex.service.parameters.ApexParameters;
import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
import org.onap.policy.common.parameters.ParameterService;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
@@ -66,13 +72,12 @@ public class ApexActivator {
// The logger for this class
private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexActivator.class);
- // The parameters of the Apex activator when running with multiple policies
@Getter
@Setter
- private Map<ToscaConceptIdentifier, ApexParameters> apexParametersMap;
+ private ApexParameters apexParameters;
@Getter
- Map<ToscaConceptIdentifier, AxPolicyModel> policyModelsMap;
+ private AxPolicyModel policyModel;
// Event unmarshalers are used to receive events asynchronously into Apex
private final Map<String, ApexEventUnmarshaller> unmarshallerMap = new LinkedHashMap<>();
@@ -93,10 +98,10 @@ public class ApexActivator {
/**
* Instantiate the activator for the Apex engine as a complete service.
*
- * @param parametersMap the apex parameters map for the Apex service
+ * @param parameters the apex parameters for the Apex service
*/
- public ApexActivator(Map<ToscaConceptIdentifier, ApexParameters> parametersMap) {
- apexParametersMap = parametersMap;
+ public ApexActivator(ApexParameters parameters) {
+ apexParameters = parameters;
}
/**
@@ -108,96 +113,131 @@ public class ApexActivator {
LOGGER.debug("Apex engine starting as a service . . .");
try {
- ApexParameters apexParameters = apexParametersMap.values().iterator().next();
- // totalInstanceCount is the sum of instance counts required as per each policy
- int totalInstanceCount = apexParametersMap.values().stream()
- .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
- apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount);
- engineKey = apexParameters.getEngineServiceParameters().getEngineKey();
instantiateEngine(apexParameters);
setUpModelMarshallerAndUnmarshaller(apexParameters);
} catch (final Exception e) {
- LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
+ try {
+ terminate();
+ } catch (ApexException e1) {
+ LOGGER.error("Terminating the ApexActivator encountered error", e1);
+ }
throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
}
LOGGER.debug("Apex engine started as a service");
}
- private void setUpModelMarshallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException {
- policyModelsMap = new LinkedHashMap<>();
- Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
- Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
- Set<Entry<ToscaConceptIdentifier, ApexParameters>> apexParamsEntrySet = new LinkedHashSet<>(
- apexParametersMap.entrySet());
- apexParamsEntrySet.stream().forEach(apexParamsEntry -> {
- ApexParameters apexParams = apexParamsEntry.getValue();
- List<String> duplicateInputParameters = new ArrayList<>(apexParams.getEventInputParameters().keySet());
- duplicateInputParameters.retainAll(inputParametersMap.keySet());
- List<String> duplicateOutputParameters = new ArrayList<>(apexParams.getEventOutputParameters().keySet());
- duplicateOutputParameters.retainAll(outputParametersMap.keySet());
-
- if (!(duplicateInputParameters.isEmpty() && duplicateOutputParameters.isEmpty())) {
- LOGGER.error("I/O Parameters {}/{} for {}:{} are duplicates. So this policy is not executed.",
- duplicateInputParameters, duplicateOutputParameters, apexParamsEntry.getKey().getName(),
- apexParamsEntry.getKey().getVersion());
- apexParametersMap.remove(apexParamsEntry.getKey());
- return;
- }
- inputParametersMap.putAll(apexParams.getEventInputParameters());
- outputParametersMap.putAll(apexParams.getEventOutputParameters());
- // Check if a policy model file has been specified
- if (apexParams.getEngineServiceParameters().getPolicyModel() != null) {
- LOGGER.debug("deploying policy model to the apex engines . . .");
- try {
- final String policyModelString = apexParams.getEngineServiceParameters().getPolicyModel();
- AxPolicyModel policyModel = EngineServiceImpl.createModel(engineKey, policyModelString);
- policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
- } catch (ApexException e) {
- throw new ApexRuntimeException("Failed to create the apex model.", e);
- }
- }
- });
- AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
+ private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
+ if (null != apexEngineService && apexEngineService.getKey().equals(engineKey)) {
+ throw new ApexException("Apex Engine already initialized.");
+ }
+ // Create engine with specified thread count
+ LOGGER.debug("starting apex engine service . . .");
+ apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters());
+
+ // Create the engine holder to hold the engine's references and act as an event
+ // receiver
+ engineServiceHandler = new ApexEngineServiceHandler(apexEngineService);
+ }
+
+ private void setUpModelMarshallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException {
+ AxPolicyModel model;
+ try {
+ final String policyModelString = apexParameters.getEngineServiceParameters().getPolicyModel();
+ model = EngineServiceImpl.createModel(apexParameters.getEngineServiceParameters().getEngineKey(),
+ policyModelString);
+ } catch (ApexException e) {
+ throw new ApexRuntimeException("Failed to create the apex model.", e);
+ }
+ AxKeyInformation existingKeyInformation = null;
+ AxContextSchemas existingSchemas = null;
+ AxEvents existingEvents = null;
+ AxContextAlbums existingAlbums = null;
+ AxTasks existingTasks = null;
+ AxPolicies existingPolicies = null;
+ if (ModelService.existsModel(AxPolicyModel.class)) {
+ existingKeyInformation = new AxKeyInformation(ModelService.getModel(AxKeyInformation.class));
+ existingSchemas = new AxContextSchemas(ModelService.getModel(AxContextSchemas.class));
+ existingEvents = new AxEvents(ModelService.getModel(AxEvents.class));
+ existingAlbums = new AxContextAlbums(ModelService.getModel(AxContextAlbums.class));
+ existingTasks = new AxTasks(ModelService.getModel(AxTasks.class));
+ existingPolicies = new AxPolicies(ModelService.getModel(AxPolicies.class));
+ }
// Set the policy model in the engine
- apexEngineService.updateModel(engineKey, finalPolicyModel, true);
+ try {
+ apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), model, true);
+ policyModel = new AxPolicyModel(model);
+ } catch (ApexException exp) {
+ // Discard concepts from the current policy
+ if (null != existingKeyInformation) {
+ // Update model service with other policies' concepts.
+ ModelService.registerModel(AxKeyInformation.class, existingKeyInformation);
+ ModelService.registerModel(AxContextSchemas.class, existingSchemas);
+ ModelService.registerModel(AxEvents.class, existingEvents);
+ ModelService.registerModel(AxContextAlbums.class, existingAlbums);
+ ModelService.registerModel(AxTasks.class, existingTasks);
+ ModelService.registerModel(AxPolicies.class, existingPolicies);
+ } else {
+ ModelService.clear();
+ }
+ throw exp;
+ }
+ if (null != existingKeyInformation) {
+ // Make sure all concepts in previously deployed policies are retained in ModelService
+ // during multi policy deployment.
+ updateModelService(existingKeyInformation, existingSchemas, existingEvents, existingAlbums, existingTasks,
+ existingPolicies);
+ }
- handleExistingMarshallerAndUnmarshaller(inputParametersMap, outputParametersMap);
- setUpNewMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
- outputParametersMap);
+ setUpNewMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(),
+ apexParameters.getEventInputParameters(), apexParameters.getEventOutputParameters());
// Wire up pairings between marhsallers and unmarshallers
- setUpMarshalerPairings(inputParametersMap);
+ setUpMarshalerPairings(apexParameters.getEventInputParameters());
// Start event processing
- startUnmarshallers(inputParametersMap);
+ startUnmarshallers(apexParameters.getEventInputParameters());
}
- private AxPolicyModel aggregatePolicyModels(Map<ToscaConceptIdentifier, AxPolicyModel> policyModelsMap) {
- // Doing a deep copy so that original values in policyModelsMap is retained
- // after reduction operation
- Set<Entry<ToscaConceptIdentifier, AxPolicyModel>> policyModelsEntries = policyModelsMap.entrySet().stream()
- .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue())).collect(Collectors.toSet());
- Optional<Entry<ToscaConceptIdentifier, AxPolicyModel>> finalPolicyModelEntry = policyModelsEntries.stream()
- .reduce((entry1, entry2) -> {
- try {
- entry1.setValue(
- PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
- } catch (ApexModelException exc) {
- LOGGER.error("Policy model for {} : {} has duplicates. So this policy is not executed.",
- entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
- apexParametersMap.remove(entry2.getKey());
- policyModelsMap.remove(entry2.getKey());
- }
- return entry1;
- });
- AxPolicyModel finalPolicyModel = null;
- if (finalPolicyModelEntry.isPresent()) {
- finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.get().getValue());
+ private void updateModelService(AxKeyInformation existingKeyInformation,
+ AxContextSchemas existingSchemas, AxEvents existingEvents,
+ AxContextAlbums existingAlbums, AxTasks existingTasks,
+ AxPolicies existingPolicies) throws ApexModelException {
+
+ AxContextSchemas axContextSchemas = ModelService.getModel(AxContextSchemas.class);
+ AxEvents axEvents = ModelService.getModel(AxEvents.class);
+ AxContextAlbums axContextAlbums = ModelService.getModel(AxContextAlbums.class);
+ AxTasks axTasks = ModelService.getModel(AxTasks.class);
+ AxPolicies axPolicies = ModelService.getModel(AxPolicies.class);
+
+ Map<AxArtifactKey, AxContextSchema> newSchemasMap = axContextSchemas.getSchemasMap();
+ Map<AxArtifactKey, AxEvent> newEventsMap = axEvents.getEventMap();
+ Map<AxArtifactKey, AxContextAlbum> newAlbumsMap = axContextAlbums.getAlbumsMap();
+ Map<AxArtifactKey, AxTask> newTasksMap = axTasks.getTaskMap();
+ Map<AxArtifactKey, AxPolicy> newPoliciesMap = axPolicies.getPolicyMap();
+
+ StringBuilder errorMessage = new StringBuilder();
+ PolicyModelMerger.checkForDuplicateItem(existingSchemas.getSchemasMap(), newSchemasMap, errorMessage, "schema");
+ PolicyModelMerger.checkForDuplicateItem(existingEvents.getEventMap(), newEventsMap, errorMessage, "event");
+ PolicyModelMerger.checkForDuplicateItem(existingAlbums.getAlbumsMap(), newAlbumsMap, errorMessage, "album");
+ PolicyModelMerger.checkForDuplicateItem(existingTasks.getTaskMap(), newTasksMap, errorMessage, "task");
+ PolicyModelMerger.checkForDuplicateItem(existingPolicies.getPolicyMap(), newPoliciesMap, errorMessage,
+ "policy");
+ if (errorMessage.length() > 0) {
+ throw new ApexModelException(errorMessage.toString());
}
- return finalPolicyModel;
+
+ AxKeyInformation axKeyInformation = ModelService.getModel(AxKeyInformation.class);
+ Map<AxArtifactKey, AxKeyInfo> newKeyInfoMap = axKeyInformation.getKeyInfoMap();
+ // Now add all the concepts that must be copied over
+ newKeyInfoMap.putAll(existingKeyInformation.getKeyInfoMap());
+ newSchemasMap.putAll(existingSchemas.getSchemasMap());
+ newEventsMap.putAll(existingEvents.getEventMap());
+ newAlbumsMap.putAll(existingAlbums.getAlbumsMap());
+ newTasksMap.putAll(existingTasks.getTaskMap());
+ newPoliciesMap.putAll(existingPolicies.getPolicyMap());
}
private void setUpNewMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
@@ -226,38 +266,6 @@ public class ApexActivator {
}
}
- private void handleExistingMarshallerAndUnmarshaller(Map<String, EventHandlerParameters> inputParametersMap,
- Map<String, EventHandlerParameters> outputParametersMap) {
- // stop and remove any marshaller/unmarshaller that is part of a policy that is
- // undeployed
- marshallerMap.entrySet().stream()
- .filter(marshallerEntry -> !outputParametersMap.containsKey(marshallerEntry.getKey()))
- .forEach(marshallerEntry -> marshallerEntry.getValue().stop());
- marshallerMap.keySet().removeIf(marshallerKey -> !outputParametersMap.containsKey(marshallerKey));
- unmarshallerMap.entrySet().stream()
- .filter(unmarshallerEntry -> !inputParametersMap.containsKey(unmarshallerEntry.getKey()))
- .forEach(unmarshallerEntry -> unmarshallerEntry.getValue().stop());
- unmarshallerMap.keySet().removeIf(unmarshallerKey -> !inputParametersMap.containsKey(unmarshallerKey));
-
- // If a marshaller/unmarshaller is already initialized, they don't need to be
- // reinitialized during model update.
- outputParametersMap.keySet().removeIf(marshallerMap::containsKey);
- inputParametersMap.keySet().removeIf(unmarshallerMap::containsKey);
- }
-
- private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
- if (null != apexEngineService && apexEngineService.getKey().equals(engineKey)) {
- throw new ApexException("Apex Engine already initialized.");
- }
- // Create engine with specified thread count
- LOGGER.debug("starting apex engine service . . .");
- apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters());
-
- // Create the engine holder to hold the engine's references and act as an event
- // receiver
- engineServiceHandler = new ApexEngineServiceHandler(apexEngineService);
- }
-
/**
* Set up unmarshaler/marshaler pairing for synchronized event handling. We only
* need to traverse the unmarshalers because the unmarshalers and marshalers are
@@ -275,8 +283,8 @@ public class ApexActivator {
// Check if the unmarshaler is synchronized with a marshaler
if (inputParameters.getValue().isPeeredMode(peeredMode)) {
// Find the unmarshaler and marshaler
- final ApexEventMarshaller peeredMarshaler = marshallerMap
- .get(inputParameters.getValue().getPeer(peeredMode));
+ final ApexEventMarshaller peeredMarshaler =
+ marshallerMap.get(inputParameters.getValue().getPeer(peeredMode));
// Connect the unmarshaler and marshaler
unmarshaller.connectMarshaler(peeredMode, peeredMarshaler);
@@ -298,22 +306,6 @@ public class ApexActivator {
}
/**
- * Updates the APEX Engine with the model created from new Policies.
- *
- * @param apexParamsMap the apex parameters map for the Apex service
- * @throws ApexException on errors
- */
- public void updateModel(Map<ToscaConceptIdentifier, ApexParameters> apexParamsMap) throws ApexException {
- try {
- ApexParameters apexParameters = apexParamsMap.values().iterator().next();
- setUpModelMarshallerAndUnmarshaller(apexParameters);
- } catch (final Exception e) {
- LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
- throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
- }
- }
-
- /**
* Get the Apex engine worker stats.
*/
public List<AxEngineModel> getEngineStats() {
@@ -338,19 +330,25 @@ public class ApexActivator {
engineServiceHandler.terminate();
engineServiceHandler = null;
}
-
- // Clear the services
- ModelService.clear();
- ParameterService.clear();
+ // Clear the services in case if this was the only policy running in the engine
+ ApexParameters apexParams = null;
+ if (ParameterService.contains(ApexParameterConstants.MAIN_GROUP_NAME)) {
+ apexParams = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME);
+ }
+ if (null != apexParams
+ && apexParameters.getEventInputParameters().size() == apexParams.getEventInputParameters().size()) {
+ ModelService.clear();
+ ParameterService.clear();
+ }
}
/**
* Shuts down all marshallers and unmarshallers.
*/
private void shutdownMarshallerAndUnmarshaller() {
- marshallerMap.values().forEach(ApexEventMarshaller::stop);
- marshallerMap.clear();
unmarshallerMap.values().forEach(ApexEventUnmarshaller::stop);
unmarshallerMap.clear();
+ marshallerMap.values().forEach(ApexEventMarshaller::stop);
+ marshallerMap.clear();
}
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java
index fd6ac4489..fdc404c67 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2019-2020 Nordix Foundation.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -347,17 +347,16 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
// Order the stop
stopOrderedFlag = true;
- // Order a stop on the synchronous cache if one exists
- if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
- && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
- ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
- }
-
// Wait for thread shutdown
while (unmarshallerThread != null && unmarshallerThread.isAlive()) {
ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL);
}
+ // Order a stop on the synchronous cache if one exists
+ if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
+ && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
+ ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
+ }
LOGGER.exit("shut down Apex event unmarshaller");
}
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java
index 5707c95ad..65c2acffa 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* Modification Copyright (C) 2019-2021 Nordix Foundation.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,26 +24,20 @@ package org.onap.policy.apex.service.engine.main;
import java.util.Arrays;
import java.util.Base64;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
-import java.util.TreeMap;
+import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
import org.onap.policy.apex.core.engine.EngineParameters;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
-import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
-import org.onap.policy.apex.model.basicmodel.service.ModelService;
-import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
-import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
-import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
+import org.onap.policy.apex.service.parameters.ApexParameterConstants;
import org.onap.policy.apex.service.parameters.ApexParameterHandler;
import org.onap.policy.apex.service.parameters.ApexParameters;
+import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
import org.onap.policy.common.parameters.ParameterService;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
@@ -58,14 +52,12 @@ public class ApexMain {
private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexMain.class);
// The Apex Activator that activates the Apex engine
+ @Getter
private ApexActivator activator;
// The parameters read in from JSON for each policy
@Getter
- private Map<ToscaConceptIdentifier, ApexParameters> apexParametersMap;
-
- //engineParameters are aggregated in case of multiple policies
- private EngineParameters aggregatedEngineParameters;
+ private ApexParameters apexParameters;
private ApexParameterHandler apexParameterHandler = new ApexParameterHandler();
@@ -77,73 +69,27 @@ public class ApexMain {
* Instantiates the Apex service.
*
* @param args the command line arguments
+ * @throws ApexException the apex exception.
*/
- public ApexMain(final String[] args) {
+ public ApexMain(final String[] args) throws ApexException {
LOGGER.entry("Starting Apex service with parameters " + Arrays.toString(args) + " . . .");
- apexParametersMap = new LinkedHashMap<>();
- aggregatedEngineParameters = new EngineParameters();
try {
- apexParametersMap.put(new ToscaConceptIdentifier(), populateApexParameters(args));
+ apexParameters = populateApexParameters(args);
} catch (ApexException e) {
LOGGER.error(APEX_SERVICE_FAILED_MSG, e);
return;
}
- apexParameterHandler.registerParameters(apexParametersMap.values().iterator().next());
- // Now, create the activator for the Apex service
- activator = new ApexActivator(apexParametersMap);
-
- // Start the activator
- try {
- activator.initialize();
- setAlive(true);
- } catch (final ApexActivatorException e) {
- LOGGER.error("start of Apex service failed, used parameters are " + Arrays.toString(args), e);
- return;
- }
-
- // Add a shutdown hook to shut everything down in an orderly manner
- Runtime.getRuntime().addShutdownHook(new ApexMainShutdownHookClass());
- LOGGER.exit("Started Apex");
- }
-
- /**
- * Instantiates the Apex service for multiple policies.
- *
- * @param policyArgumentsMap the map with command line arguments as value and policy-id as key
- * @throws ApexException on errors
- */
- public ApexMain(Map<ToscaConceptIdentifier, String[]> policyArgumentsMap) throws ApexException {
- apexParametersMap = new LinkedHashMap<>();
- aggregatedEngineParameters = new EngineParameters();
- for (Entry<ToscaConceptIdentifier, String[]> policyArgsEntry: policyArgumentsMap.entrySet()) {
- try {
- apexParametersMap.put(policyArgsEntry.getKey(), populateApexParameters(policyArgsEntry.getValue()));
- } catch (ApexException e) {
- LOGGER.error("Invalid arguments specified for policy - " + policyArgsEntry.getKey().getName() + ":"
- + policyArgsEntry.getKey().getVersion(), e);
- }
- }
- if (apexParametersMap.isEmpty()) {
- LOGGER.error(APEX_SERVICE_FAILED_MSG);
- return;
- }
+ aggregateParametersAndRegister();
- // Set the aggregated engineParameters which will be used later while creating the engine
- ApexParameters apexParameters = apexParametersMap.values().iterator().next();
- apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters);
- apexParameterHandler.registerParameters(apexParameters);
// Now, create the activator for the Apex service
- activator = new ApexActivator(apexParametersMap);
+ activator = new ApexActivator(apexParameters);
// Start the activator
try {
activator.initialize();
- apexParametersMap = activator.getApexParametersMap();
setAlive(true);
} catch (final ApexActivatorException e) {
- LOGGER.error(APEX_SERVICE_FAILED_MSG, e);
- activator.terminate();
- return;
+ throw new ApexException("start of Apex service failed, used parameters are " + Arrays.toString(args), e);
}
// Add a shutdown hook to shut everything down in an orderly manner
@@ -151,75 +97,6 @@ public class ApexMain {
LOGGER.exit("Started Apex");
}
- /**
- * Updates the APEX Engine with the model created from new Policies.
- *
- * @param policyArgsMap the map with command line arguments as value and policy-id as key
- * @throws ApexException on errors
- */
- public void updateModel(Map<ToscaConceptIdentifier, String[]> policyArgsMap) throws ApexException {
- // flag that determines if any policy received in PDP_UPDATE is already deployed in the engine
- boolean isAnyPolicyDeployed = policyArgsMap.keySet().stream().anyMatch(apexParametersMap::containsKey);
- apexParametersMap.clear();
- aggregatedEngineParameters = new EngineParameters();
- AxContextAlbums albums = ModelService.getModel(AxContextAlbums.class);
- Map<AxArtifactKey, AxContextAlbum> albumsMap = new TreeMap<>();
- for (Entry<ToscaConceptIdentifier, String[]> policyArgsEntry : policyArgsMap.entrySet()) {
- findAlbumsToHold(albumsMap, policyArgsEntry.getKey());
- try {
- apexParametersMap.put(policyArgsEntry.getKey(), populateApexParameters(policyArgsEntry.getValue()));
- } catch (ApexException e) {
- LOGGER.error("Invalid arguments specified for policy - {}:{}", policyArgsEntry.getKey().getName(),
- policyArgsEntry.getKey().getVersion(), e);
- }
- }
- // Set the aggregated engineParameters
- ApexParameters apexParameters = apexParametersMap.values().iterator().next();
- apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters);
- ParameterService.clear();
- apexParameterHandler.registerParameters(apexParameters);
- try {
- if (albumsMap.isEmpty() && !isAnyPolicyDeployed) {
- // clear context since none of the policies' context albums has to be retained
- // this could be because all policies have a major version change,
- // or a full set of new policies are received in the update message
- activator.terminate();
- // ParameterService is cleared when activator is terminated. Register the parameters again in this case
- apexParameterHandler.registerParameters(apexParameters);
- activator = new ApexActivator(apexParametersMap);
- activator.initialize();
- setAlive(true);
- } else {
- albums.setAlbumsMap(albumsMap);
- activator.setApexParametersMap(apexParametersMap);
- activator.updateModel(apexParametersMap);
- }
- } catch (final ApexException e) {
- LOGGER.error(APEX_SERVICE_FAILED_MSG, e);
- activator.terminate();
- throw new ApexException(e.getMessage());
- }
- }
-
- /**
- * Find the context albums which should be retained when updating the policies.
- *
- * @param albumsMap the albums which should be kept during model update
- * @param policyId the policy id of current policy
- */
- private void findAlbumsToHold(Map<AxArtifactKey, AxContextAlbum> albumsMap, ToscaConceptIdentifier policyId) {
- for (Entry<ToscaConceptIdentifier, AxPolicyModel> policyModelsEntry : activator.getPolicyModelsMap()
- .entrySet()) {
- // If a policy with the same major version is received in PDP_UPDATE,
- // context for that policy has to be retained. For this, take such policies' albums
- if (policyModelsEntry.getKey().getName().equals(policyId.getName())
- && policyModelsEntry.getKey().getVersion().split("\\.")[0]
- .equals(policyId.getVersion().split("\\.")[0])) {
- albumsMap.putAll(policyModelsEntry.getValue().getAlbums().getAlbumsMap());
- }
- }
- }
-
private ApexParameters populateApexParameters(String[] args) throws ApexException {
// Check the arguments
final ApexCommandLineArguments arguments = new ApexCommandLineArguments();
@@ -263,11 +140,32 @@ public class ApexMain {
ehParameterEntry.getValue().setName(ehParameterEntry.getKey());
}
}
- aggregateEngineParameters(axParameters.getEngineServiceParameters().getEngineParameters());
return axParameters;
}
- private void aggregateEngineParameters(EngineParameters engineParameters) {
+ private void aggregateParametersAndRegister() throws ApexException {
+ ApexParameters aggregatedParameters = null;
+ if (ParameterService.contains(ApexParameterConstants.MAIN_GROUP_NAME)) {
+ aggregatedParameters = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME);
+ } else {
+ aggregatedParameters = new ApexParameters();
+ aggregatedParameters.setEngineServiceParameters(new EngineServiceParameters());
+ apexParameterHandler.registerParameters(aggregatedParameters);
+ }
+ List<String> duplicateInputParameters = aggregatedParameters.getEventInputParameters().keySet().stream()
+ .filter(apexParameters.getEventInputParameters()::containsKey).collect(Collectors.toList());
+ List<String> duplicateOutputParameters = aggregatedParameters.getEventOutputParameters().keySet().stream()
+ .filter(apexParameters.getEventOutputParameters()::containsKey).collect(Collectors.toList());
+ if (!(duplicateInputParameters.isEmpty() && duplicateOutputParameters.isEmpty())) {
+ throw new ApexException(
+ "start of Apex service failed because this policy has the following duplicate I/O parameters: "
+ + duplicateInputParameters + "/" + duplicateOutputParameters);
+ }
+ aggregatedParameters.getEventInputParameters().putAll(apexParameters.getEventInputParameters());
+ aggregatedParameters.getEventOutputParameters().putAll(apexParameters.getEventOutputParameters());
+ EngineParameters aggregatedEngineParameters =
+ aggregatedParameters.getEngineServiceParameters().getEngineParameters();
+ EngineParameters engineParameters = apexParameters.getEngineServiceParameters().getEngineParameters();
aggregatedEngineParameters.getTaskParameters().addAll(engineParameters.getTaskParameters());
aggregatedEngineParameters.getExecutorParameterMap().putAll(engineParameters.getExecutorParameterMap());
aggregatedEngineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap()
@@ -346,8 +244,9 @@ public class ApexMain {
* The main method.
*
* @param args the arguments
+ * @throws ApexException the apex exception.
*/
- public static void main(final String[] args) {
+ public static void main(final String[] args) throws ApexException {
new ApexMain(args);
}
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java
index 9a07b4ad5..673d9cab5 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2019 Nordix Foundation.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -73,8 +73,6 @@ public class ApexParameterHandler {
*/
public ApexParameters getParameters(final ApexCommandLineArguments arguments) throws ParameterException {
- ParameterService.clear();
-
ApexParameters parameters = null;
String toscaPolicyFilePath = arguments.getToscaPolicyFilePath();
// Read the parameters
diff --git a/services/services-engine/src/test/java/org/onap/policy/apex/service/engine/main/ApexMainTest.java b/services/services-engine/src/test/java/org/onap/policy/apex/service/engine/main/ApexMainTest.java
index 68472d4de..2cb12c397 100644
--- a/services/services-engine/src/test/java/org/onap/policy/apex/service/engine/main/ApexMainTest.java
+++ b/services/services-engine/src/test/java/org/onap/policy/apex/service/engine/main/ApexMainTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2020-2021 Nordix Foundation.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,21 +23,22 @@
package org.onap.policy.apex.service.engine.main;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Test;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
-import org.onap.policy.apex.service.parameters.ApexParameters;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.onap.policy.apex.model.basicmodel.service.ModelService;
+import org.onap.policy.common.parameters.ParameterService;
/**
* Test the ApexMain class.
@@ -114,13 +115,13 @@ public class ApexMainTest {
OutputStream outContent = new ByteArrayOutputStream();
System.setOut(new PrintStream(outContent));
- String[] args = { "-p", "src/test/resources/parameters/correctParams.json" };
+ String[] args = {"-p", "src/test/resources/parameters/correctParams.json"};
final ApexMain apexMain = new ApexMain(args);
- assertEquals("MyApexEngine",
- apexMain.getApexParametersMap().values().iterator().next().getEngineServiceParameters().getName());
- await().atMost(200, TimeUnit.MILLISECONDS).until(() -> outContent.toString()
- .contains("Added the action listener to the engine"));
+ assertEquals("MyApexEngine", apexMain.getApexParameters().getEngineServiceParameters().getName());
+ await().atMost(200, TimeUnit.MILLISECONDS)
+ .until(() -> outContent.toString().contains("Added the action listener to the engine"));
+ assertTrue(apexMain.isAlive());
apexMain.shutdown();
}
@@ -129,16 +130,15 @@ public class ApexMainTest {
OutputStream outContent = new ByteArrayOutputStream();
System.setOut(new PrintStream(outContent));
- String[] args = { "-p", "src/test/resources/parameters/correctParamsJavaProperties.json" };
+ String[] args = {"-p", "src/test/resources/parameters/correctParamsJavaProperties.json"};
final ApexMain apexMain = new ApexMain(args);
- assertEquals("MyApexEngine",
- apexMain.getApexParametersMap().values().iterator().next().getEngineServiceParameters().getName());
+ assertEquals("MyApexEngine", apexMain.getApexParameters().getEngineServiceParameters().getName());
assertEquals("trust-store-file", System.getProperty("javax.net.ssl.trustStore"));
assertEquals("Pol1cy_0nap", System.getProperty("javax.net.ssl.trustStorePassword"));
- await().atMost(10000, TimeUnit.MILLISECONDS).until(() -> outContent.toString()
- .contains("Added the action listener to the engine"));
+ await().atMost(10000, TimeUnit.MILLISECONDS)
+ .until(() -> outContent.toString().contains("Added the action listener to the engine"));
apexMain.shutdown();
}
@@ -146,46 +146,41 @@ public class ApexMainTest {
public void testCorrectParametersWithMultiplePolicies() throws ApexException {
OutputStream outContent = new ByteArrayOutputStream();
System.setOut(new PrintStream(outContent));
- Map<ToscaConceptIdentifier, String[]> argsMap = new HashMap<ToscaConceptIdentifier, String[]>();
- String[] args = {"-p", "src/test/resources/parameters/correctParams.json"};
- argsMap.put(new ToscaConceptIdentifier("id1", "v1"), args);
- final ApexMain apexMain = new ApexMain(argsMap);
- ApexParameters apexParam = (ApexParameters) apexMain.getApexParametersMap().values().toArray()[0];
- assertEquals("MyApexEngine", apexParam.getEngineServiceParameters().getName());
- apexMain.shutdown();
+ String[] args1 = {"-p", "src/test/resources/parameters/correctParams.json"};
+ String[] args2 = {"-p", "src/test/resources/parameters/correctParams2.json"};
+ final ApexMain apexMain1 = new ApexMain(args1);
+ final ApexMain apexMain2 = new ApexMain(args2);
+ assertEquals("MyApexEngine", apexMain1.getApexParameters().getEngineServiceParameters().getName());
+ assertEquals("MyApexEngine2", apexMain2.getApexParameters().getEngineServiceParameters().getName());
final String outString = outContent.toString();
- assertThat(outString).contains("Added the action listener to the engine");
+ assertThat(outString).contains("Added the action listener to the engine")
+ .contains("Created apex engine MyApexEngine").contains("Created apex engine MyApexEngine2");
+ assertTrue(apexMain1.isAlive());
+ assertTrue(apexMain2.isAlive());
+ apexMain1.shutdown();
+ apexMain2.shutdown();
+ ModelService.clear();
+ ParameterService.clear();
}
@Test
public void testInCorrectParametersWithMultiplePolicies() throws ApexException {
- OutputStream outContent = new ByteArrayOutputStream();
- System.setOut(new PrintStream(outContent));
- Map<ToscaConceptIdentifier, String[]> argsMap = new HashMap<ToscaConceptIdentifier, String[]>();
String[] args = {"-p", "src/test/resources/parameters/correctParams.json"};
- argsMap.put(new ToscaConceptIdentifier("id1", "v1"), args);
- argsMap.put(new ToscaConceptIdentifier("id2", "v2"), args);
- final ApexMain apexMain = new ApexMain(argsMap);
- ApexParameters apexParam = (ApexParameters) apexMain.getApexParametersMap().values().toArray()[0];
- assertEquals("MyApexEngine", apexParam.getEngineServiceParameters().getName());
- apexMain.shutdown();
- final String outString = outContent.toString();
- assertThat(outString).contains("I/O Parameters [TheFileConsumer1]/[FirstProducer] for id2:v2 are duplicates. "
- + "So this policy is not executed");
- assertEquals(1, apexMain.getApexParametersMap().size()); // only id1:v1 is kept in the map, id2:v2 failed
+ final ApexMain apexMain1 = new ApexMain(args);
+ assertThatThrownBy(() -> new ApexMain(args)).hasMessage("start of Apex service failed because this"
+ + " policy has the following duplicate I/O parameters: [TheFileConsumer1]/[FirstProducer]");
+ apexMain1.shutdown();
}
@Test
public void testInvalidArgsWithMultiplePolicies() throws ApexException {
OutputStream outContent = new ByteArrayOutputStream();
System.setOut(new PrintStream(outContent));
- Map<ToscaConceptIdentifier, String[]> argsMap = new HashMap<ToscaConceptIdentifier, String[]>();
String[] args = {"-c", "file1", "-m", "file2"};
- argsMap.put(new ToscaConceptIdentifier("id1", "v1"), args);
- final ApexMain apexMain = new ApexMain(argsMap);
+ final ApexMain apexMain = new ApexMain(args);
final String outString = outContent.toString();
apexMain.shutdown();
assertThat(outString).contains("Arguments validation failed", "start of Apex service failed");
- assertThat(apexMain.getApexParametersMap()).isEmpty(); // No policy is running in the engine
+ assertFalse(apexMain.isAlive()); // No policy is running in the engine
}
}
diff --git a/services/services-engine/src/test/resources/parameters/correctParams.json b/services/services-engine/src/test/resources/parameters/correctParams.json
index 43fef1cbe..7ea06d715 100644
--- a/services/services-engine/src/test/resources/parameters/correctParams.json
+++ b/services/services-engine/src/test/resources/parameters/correctParams.json
@@ -501,7 +501,8 @@
},
"eventProtocolParameters": {
"eventProtocol": "JSON"
- }
+ },
+ "eventName": "BasicEvent"
}
}
}
diff --git a/services/services-engine/src/test/resources/parameters/correctParams2.json b/services/services-engine/src/test/resources/parameters/correctParams2.json
new file mode 100644
index 000000000..2d2ebff23
--- /dev/null
+++ b/services/services-engine/src/test/resources/parameters/correctParams2.json
@@ -0,0 +1,513 @@
+{
+ "tosca_definitions_version": "tosca_simple_yaml_1_1_0",
+ "topology_template": {
+ "policies": [
+ {
+ "onap.policies.apex.Simplecontrolloop": {
+ "type": "onap.policies.native.Apex",
+ "type_version": "1.0.0",
+ "name": "onap.policies.apex.Simplecontrolloop",
+ "version": "1.0.0",
+ "properties": {
+ "engineServiceParameters": {
+ "name": "MyApexEngine2",
+ "version": "0.0.1",
+ "id": 45,
+ "instanceCount": 2,
+ "deploymentPort": 65522,
+ "policy_type_impl": {
+ "apexPolicyModel": {
+ "key": {
+ "name": "SmallModel",
+ "version": "0.0.1"
+ },
+ "keyInformation": {
+ "key": {
+ "name": "SmallModel_KeyInfo",
+ "version": "0.0.1"
+ },
+ "keyInfoMap": {
+ "entry": [
+ {
+ "key": {
+ "name": "BasicContextAlbum",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "BasicContextAlbum",
+ "version": "0.0.1"
+ },
+ "UUID": "fec1b353-b35f-4384-b7d9-69622059c248",
+ "description": "Generated description for a concept called \"BasicContextAlbum\" with version \"0.0.1\" and UUID \"fec1b353-b35f-4384-b7d9-69622059c248\""
+ }
+ },
+ {
+ "key": {
+ "name": "BasicEvent",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "BasicEvent",
+ "version": "0.0.1"
+ },
+ "UUID": "cc8d3c1a-e975-459a-bcd2-69f423eaa1f3",
+ "description": "Generated description for a concept called \"BasicEvent\" with version \"0.0.1\" and UUID \"cc8d3c1a-e975-459a-bcd2-69f423eaa1f3\""
+ }
+ },
+ {
+ "key": {
+ "name": "BasicPolicy",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "BasicPolicy",
+ "version": "0.0.1"
+ },
+ "UUID": "d0c5d8ee-5fe7-4978-89ce-4a3e69cad043",
+ "description": "Generated description for a concept called \"BasicPolicy\" with version \"0.0.1\" and UUID \"d0c5d8ee-5fe7-4978-89ce-4a3e69cad043\""
+ }
+ },
+ {
+ "key": {
+ "name": "BasicTask",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "BasicTask",
+ "version": "0.0.1"
+ },
+ "UUID": "c5651414-fc1c-493b-878d-75f0ce685c36",
+ "description": "Generated description for a concept called \"BasicTask\" with version \"0.0.1\" and UUID \"c5651414-fc1c-493b-878d-75f0ce685c36\""
+ }
+ },
+ {
+ "key": {
+ "name": "IntType",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "IntType",
+ "version": "0.0.1"
+ },
+ "UUID": "790ff718-8dc0-44e0-89d8-1b3bbe238310",
+ "description": "Generated description for a concept called \"IntType\" with version \"0.0.1\" and UUID \"790ff718-8dc0-44e0-89d8-1b3bbe238310\""
+ }
+ },
+ {
+ "key": {
+ "name": "SmallModel",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "SmallModel",
+ "version": "0.0.1"
+ },
+ "UUID": "a1bd1f4e-713b-456b-b1a8-bb48beee28e8",
+ "description": "Generated description for a concept called \"SmallModel\" with version \"0.0.1\" and UUID \"a1bd1f4e-713b-456b-b1a8-bb48beee28e8\""
+ }
+ },
+ {
+ "key": {
+ "name": "SmallModel_Albums",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "SmallModel_Albums",
+ "version": "0.0.1"
+ },
+ "UUID": "72bed9af-ab7d-3379-b9f7-b5eca5c9ef22",
+ "description": "Generated description for concept referred to by key \"SmallModel_Albums:0.0.1\""
+ }
+ },
+ {
+ "key": {
+ "name": "SmallModel_Events",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "SmallModel_Events",
+ "version": "0.0.1"
+ },
+ "UUID": "796dc6b0-627d-34ae-a5e2-1bc4b4b486b8",
+ "description": "Generated description for concept referred to by key \"SmallModel_Events:0.0.1\""
+ }
+ },
+ {
+ "key": {
+ "name": "SmallModel_KeyInfo",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "SmallModel_KeyInfo",
+ "version": "0.0.1"
+ },
+ "UUID": "b4876774-6907-3d27-a2b8-f05737c5ee4a",
+ "description": "Generated description for concept referred to by key \"SmallModel_KeyInfo:0.0.1\""
+ }
+ },
+ {
+ "key": {
+ "name": "SmallModel_Policies",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "SmallModel_Policies",
+ "version": "0.0.1"
+ },
+ "UUID": "5bcf946b-67be-3190-a906-f954896f999f",
+ "description": "Generated description for concept referred to by key \"SmallModel_Policies:0.0.1\""
+ }
+ },
+ {
+ "key": {
+ "name": "SmallModel_Schemas",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "SmallModel_Schemas",
+ "version": "0.0.1"
+ },
+ "UUID": "c25bf5c3-7f1e-3667-b8a9-971ba21517bc",
+ "description": "Generated description for concept referred to by key \"SmallModel_Schemas:0.0.1\""
+ }
+ },
+ {
+ "key": {
+ "name": "SmallModel_Tasks",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "SmallModel_Tasks",
+ "version": "0.0.1"
+ },
+ "UUID": "43b015ca-2ed1-3a35-b103-e8a5aa68f1ef",
+ "description": "Generated description for concept referred to by key \"SmallModel_Tasks:0.0.1\""
+ }
+ }
+ ]
+ }
+ },
+ "policies": {
+ "key": {
+ "name": "SmallModel_Policies",
+ "version": "0.0.1"
+ },
+ "policyMap": {
+ "entry": [
+ {
+ "key": {
+ "name": "BasicPolicy",
+ "version": "0.0.1"
+ },
+ "value": {
+ "policyKey": {
+ "name": "BasicPolicy",
+ "version": "0.0.1"
+ },
+ "template": "FREEFORM",
+ "state": {
+ "entry": [
+ {
+ "key": "OnlyState",
+ "value": {
+ "stateKey": {
+ "parentKeyName": "BasicPolicy",
+ "parentKeyVersion": "0.0.1",
+ "parentLocalName": "NULL",
+ "localName": "OnlyState"
+ },
+ "trigger": {
+ "name": "BasicEvent",
+ "version": "0.0.1"
+ },
+ "stateOutputs": {
+ "entry": [
+ {
+ "key": "OnlyOutput",
+ "value": {
+ "key": {
+ "parentKeyName": "BasicPolicy",
+ "parentKeyVersion": "0.0.1",
+ "parentLocalName": "OnlyState",
+ "localName": "OnlyOutput"
+ },
+ "outgoingEvent": {
+ "name": "BasicEvent",
+ "version": "0.0.1"
+ },
+ "nextState": {
+ "parentKeyName": "NULL",
+ "parentKeyVersion": "0.0.0",
+ "parentLocalName": "NULL",
+ "localName": "NULL"
+ }
+ }
+ }
+ ]
+ },
+ "contextAlbumReference": [
+ {
+ "name": "BasicContextAlbum",
+ "version": "0.0.1"
+ }
+ ],
+ "taskSelectionLogic": {
+ "key": "NULL",
+ "logicFlavour": "UNDEFINED",
+ "logic": ""
+ },
+ "stateFinalizerLogicMap": {
+ "entry": []
+ },
+ "defaultTask": {
+ "name": "BasicTask",
+ "version": "0.0.1"
+ },
+ "taskReferences": {
+ "entry": [
+ {
+ "key": {
+ "name": "BasicTask",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "parentKeyName": "BasicPolicy",
+ "parentKeyVersion": "0.0.1",
+ "parentLocalName": "OnlyState",
+ "localName": "BasicTask"
+ },
+ "outputType": "DIRECT",
+ "output": {
+ "parentKeyName": "BasicPolicy",
+ "parentKeyVersion": "0.0.1",
+ "parentLocalName": "OnlyState",
+ "localName": "OnlyOutput"
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+ ]
+ },
+ "firstState": "OnlyState"
+ }
+ }
+ ]
+ }
+ },
+ "tasks": {
+ "key": {
+ "name": "SmallModel_Tasks",
+ "version": "0.0.1"
+ },
+ "taskMap": {
+ "entry": [
+ {
+ "key": {
+ "name": "BasicTask",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "BasicTask",
+ "version": "0.0.1"
+ },
+ "inputFields": {
+ "entry": [
+ {
+ "key": "intPar",
+ "value": {
+ "key": "intPar",
+ "fieldSchemaKey": {
+ "name": "IntType",
+ "version": "0.0.1"
+ },
+ "optional": false
+ }
+ }
+ ]
+ },
+ "outputFields": {
+ "entry": [
+ {
+ "key": "intPar",
+ "value": {
+ "key": "intPar",
+ "fieldSchemaKey": {
+ "name": "IntType",
+ "version": "0.0.1"
+ },
+ "optional": false
+ }
+ }
+ ]
+ },
+ "taskParameters": {
+ "entry": []
+ },
+ "contextAlbumReference": [
+ {
+ "name": "BasicContextAlbum",
+ "version": "0.0.1"
+ }
+ ],
+ "taskLogic": {
+ "key": "TaskLogic",
+ "logicFlavour": "JAVASCRIPT",
+ "logic": "executor.logger.debug(executor.subject.id);\nvar gc = executor.getContextAlbum(\"BasicContextAlbum\");\nexecutor.logger.debug(gc.name);\nexecutor.logger.debug(executor.inFields);\n\nexecutor.logger.debug(executor.eo);\n\nvar returnValue = executor.isTrue;"
+ }
+ }
+ }
+ ]
+ }
+ },
+ "events": {
+ "key": {
+ "name": "SmallModel_Events",
+ "version": "0.0.1"
+ },
+ "eventMap": {
+ "entry": [
+ {
+ "key": {
+ "name": "BasicEvent",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "BasicEvent",
+ "version": "0.0.1"
+ },
+ "nameSpace": "org.onap.policy.apex.events",
+ "source": "source",
+ "target": "target",
+ "parameter": {
+ "entry": [
+ {
+ "key": "intPar",
+ "value": {
+ "key": "intPar",
+ "fieldSchemaKey": {
+ "name": "IntType",
+ "version": "0.0.1"
+ },
+ "optional": false
+ }
+ }
+ ]
+ }
+ }
+ }
+ ]
+ }
+ },
+ "albums": {
+ "key": {
+ "name": "SmallModel_Albums",
+ "version": "0.0.1"
+ },
+ "albums": {
+ "entry": [
+ {
+ "key": {
+ "name": "BasicContextAlbum",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "BasicContextAlbum",
+ "version": "0.0.1"
+ },
+ "scope": "GLOBAL",
+ "isWritable": true,
+ "itemSchema": {
+ "name": "IntType",
+ "version": "0.0.1"
+ }
+ }
+ }
+ ]
+ }
+ },
+ "schemas": {
+ "key": {
+ "name": "SmallModel_Schemas",
+ "version": "0.0.1"
+ },
+ "schemas": {
+ "entry": [
+ {
+ "key": {
+ "name": "IntType",
+ "version": "0.0.1"
+ },
+ "value": {
+ "key": {
+ "name": "IntType",
+ "version": "0.0.1"
+ },
+ "schemaFlavour": "Java",
+ "schemaDefinition": "java.lang.Integer"
+ }
+ }
+ ]
+ }
+ }
+ }
+ },
+ "engineParameters": {
+ "executorParameters": {
+ "JAVASCRIPT": {
+ "parameterClassName": "org.onap.policy.apex.service.engine.parameters.dummyclasses.SuperDooperExecutorParameters"
+ }
+ }
+ }
+ },
+ "eventOutputParameters": {
+ "FirstProducer2": {
+ "carrierTechnologyParameters": {
+ "carrierTechnology": "FILE",
+ "parameters": {
+ "standardIo": true
+ }
+ },
+ "eventProtocolParameters": {
+ "eventProtocol": "JSON"
+ }
+ }
+ },
+ "eventInputParameters": {
+ "TheFileConsumer2": {
+ "carrierTechnologyParameters": {
+ "carrierTechnology": "FILE",
+ "parameters": {
+ "fileName": "src/test/resources/events/TestPojoEvent.json"
+ }
+ },
+ "eventProtocolParameters": {
+ "eventProtocol": "JSON"
+ },
+ "eventName": "BasicEvent"
+ }
+ }
+ }
+ }
+ }
+ ]
+ }
+} \ No newline at end of file
diff --git a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java
index 4d560a3bb..03e97215b 100644
--- a/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java
+++ b/services/services-onappf/src/main/java/org/onap/policy/apex/services/onappf/handler/ApexEngineHandler.java
@@ -1,7 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019-2021 Nordix Foundation.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,13 +24,36 @@ package org.onap.policy.apex.services.onappf.handler;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.onap.policy.apex.core.engine.EngineParameters;
+import org.onap.policy.apex.core.engine.TaskParameters;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
+import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInfo;
+import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInformation;
+import org.onap.policy.apex.model.basicmodel.service.ModelService;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchemas;
import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
+import org.onap.policy.apex.model.eventmodel.concepts.AxEvent;
+import org.onap.policy.apex.model.eventmodel.concepts.AxEvents;
+import org.onap.policy.apex.model.policymodel.concepts.AxPolicies;
+import org.onap.policy.apex.model.policymodel.concepts.AxPolicy;
+import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
+import org.onap.policy.apex.model.policymodel.concepts.AxTask;
+import org.onap.policy.apex.model.policymodel.concepts.AxTasks;
import org.onap.policy.apex.service.engine.main.ApexMain;
+import org.onap.policy.apex.service.parameters.ApexParameterConstants;
+import org.onap.policy.apex.service.parameters.ApexParameters;
import org.onap.policy.apex.services.onappf.exception.ApexStarterException;
+import org.onap.policy.common.parameters.ParameterService;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
@@ -49,7 +72,7 @@ public class ApexEngineHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ApexEngineHandler.class);
- private ApexMain apexMain;
+ private Map<ToscaConceptIdentifier, ApexMain> apexMainMap;
/**
* Constructs the object. Extracts the config and model files from each policy and instantiates the apex engine.
@@ -57,13 +80,13 @@ public class ApexEngineHandler {
* @param policies the list of policies
* @throws ApexStarterException if the apex engine instantiation failed using the policies passed
*/
- public ApexEngineHandler(List<ToscaPolicy> policies) throws ApexStarterException {
- Map<ToscaConceptIdentifier, String[]> policyArgsMap = createPolicyArgsMap(policies);
+ public ApexEngineHandler(List<ToscaPolicy> policies) throws ApexStarterException {
LOGGER.debug("Starting apex engine.");
- try {
- apexMain = new ApexMain(policyArgsMap);
- } catch (ApexException e) {
- throw new ApexStarterException(e);
+ apexMainMap = initiateApexEngineForPolicies(policies);
+ if (apexMainMap.isEmpty()) {
+ ModelService.clear();
+ ParameterService.clear();
+ throw new ApexStarterException("Apex Engine failed to start.");
}
}
@@ -74,20 +97,136 @@ public class ApexEngineHandler {
* @throws ApexStarterException if the apex engine instantiation failed using the policies passed
*/
public void updateApexEngine(List<ToscaPolicy> policies) throws ApexStarterException {
- if (null == apexMain || !apexMain.isAlive()) {
- throw new ApexStarterException("Apex Engine not initialized.");
+ List<ToscaConceptIdentifier> runningPolicies = getRunningPolicies();
+ List<ToscaPolicy> policiesToDeploy = policies.stream()
+ .filter(policy -> !runningPolicies.contains(policy.getIdentifier())).collect(Collectors.toList());
+ List<ToscaConceptIdentifier> policiesToUnDeploy = runningPolicies.stream()
+ .filter(polId -> policies.stream().noneMatch(policy -> policy.getIdentifier().equals(polId)))
+ .collect(Collectors.toList());
+ Map<ToscaConceptIdentifier, ApexMain> undeployedPoliciesMainMap = new LinkedHashMap<>();
+ policiesToUnDeploy.forEach(policyId -> {
+ ApexMain apexMain = apexMainMap.get(policyId);
+ try {
+ apexMain.shutdown();
+ undeployedPoliciesMainMap.put(policyId, apexMain);
+ apexMainMap.remove(policyId);
+ } catch (ApexException e) {
+ LOGGER.error("Shutting down policy {} failed", policyId, e);
+ }
+ });
+ if (!undeployedPoliciesMainMap.isEmpty()) {
+ updateModelAndParameterServices(undeployedPoliciesMainMap);
}
- Map<ToscaConceptIdentifier, String[]> policyArgsMap = createPolicyArgsMap(policies);
- try {
- apexMain.updateModel(policyArgsMap);
- } catch (ApexException e) {
- throw new ApexStarterException(e);
+ if (!policiesToDeploy.isEmpty()) {
+ Map<ToscaConceptIdentifier, ApexMain> mainMap = initiateApexEngineForPolicies(policiesToDeploy);
+ if (mainMap.isEmpty()) {
+ throw new ApexStarterException("Updating the APEX engine with new policies failed.");
+ }
+ apexMainMap.putAll(mainMap);
+ }
+ if (apexMainMap.isEmpty()) {
+ ModelService.clear();
+ ParameterService.clear();
+ }
+ }
+
+ /**
+ * Clear the corresponding items from ModelService and ParameterService.
+ *
+ * @param undeployedPoliciesMainMap the policies that are undeployed
+ */
+ private void updateModelAndParameterServices(Map<ToscaConceptIdentifier, ApexMain> undeployedPoliciesMainMap) {
+ Set<String> inputParamKeysToRetain = new HashSet<>();
+ Set<String> outputParamKeysToRetain = new HashSet<>();
+ List<TaskParameters> taskParametersToRetain = new ArrayList<>();
+ List<String> executorParamKeysToRetain = new ArrayList<>();
+ List<String> schemaParamKeysToRetain = new ArrayList<>();
+
+ List<AxArtifactKey> keyInfoKeystoRetain = new ArrayList<>();
+ List<AxArtifactKey> schemaKeystoRetain = new ArrayList<>();
+ List<AxArtifactKey> eventKeystoRetain = new ArrayList<>();
+ List<AxArtifactKey> albumKeystoRetain = new ArrayList<>();
+ List<AxArtifactKey> taskKeystoRetain = new ArrayList<>();
+ List<AxArtifactKey> policyKeystoRetain = new ArrayList<>();
+
+ apexMainMap.values().forEach(main -> {
+ inputParamKeysToRetain.addAll(main.getApexParameters().getEventInputParameters().keySet());
+ outputParamKeysToRetain.addAll(main.getApexParameters().getEventOutputParameters().keySet());
+ taskParametersToRetain.addAll(
+ main.getApexParameters().getEngineServiceParameters().getEngineParameters().getTaskParameters());
+ executorParamKeysToRetain.addAll(main.getApexParameters().getEngineServiceParameters().getEngineParameters()
+ .getExecutorParameterMap().keySet());
+ schemaParamKeysToRetain.addAll(main.getApexParameters().getEngineServiceParameters().getEngineParameters()
+ .getContextParameters().getSchemaParameters().getSchemaHelperParameterMap().keySet());
+
+ keyInfoKeystoRetain
+ .addAll(main.getActivator().getPolicyModel().getKeyInformation().getKeyInfoMap().keySet());
+ schemaKeystoRetain.addAll(main.getActivator().getPolicyModel().getSchemas().getSchemasMap().keySet());
+ eventKeystoRetain.addAll(main.getActivator().getPolicyModel().getEvents().getEventMap().keySet());
+ albumKeystoRetain.addAll(main.getActivator().getPolicyModel().getAlbums().getAlbumsMap().keySet());
+ taskKeystoRetain.addAll(main.getActivator().getPolicyModel().getTasks().getTaskMap().keySet());
+ policyKeystoRetain.addAll(main.getActivator().getPolicyModel().getPolicies().getPolicyMap().keySet());
+ });
+ for (ApexMain main : undeployedPoliciesMainMap.values()) {
+ ApexParameters existingParameters = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME);
+ List<String> eventInputParamKeysToRemove = main.getApexParameters().getEventInputParameters().keySet()
+ .stream().filter(key -> !inputParamKeysToRetain.contains(key)).collect(Collectors.toList());
+ List<String> eventOutputParamKeysToRemove = main.getApexParameters().getEventOutputParameters().keySet()
+ .stream().filter(key -> !outputParamKeysToRetain.contains(key)).collect(Collectors.toList());
+ eventInputParamKeysToRemove.forEach(existingParameters.getEventInputParameters()::remove);
+ eventOutputParamKeysToRemove.forEach(existingParameters.getEventOutputParameters()::remove);
+ EngineParameters engineParameters =
+ main.getApexParameters().getEngineServiceParameters().getEngineParameters();
+ final List<TaskParameters> taskParametersToRemove = engineParameters.getTaskParameters().stream()
+ .filter(taskParameter -> !taskParametersToRetain.contains(taskParameter)).collect(Collectors.toList());
+ final List<String> executorParamKeysToRemove = engineParameters.getExecutorParameterMap().keySet().stream()
+ .filter(key -> !executorParamKeysToRetain.contains(key)).collect(Collectors.toList());
+ final List<String> schemaParamKeysToRemove =
+ engineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap().keySet()
+ .stream().filter(key -> !schemaParamKeysToRetain.contains(key)).collect(Collectors.toList());
+ EngineParameters aggregatedEngineParameters =
+ existingParameters.getEngineServiceParameters().getEngineParameters();
+ aggregatedEngineParameters.getTaskParameters().removeAll(taskParametersToRemove);
+ executorParamKeysToRemove.forEach(aggregatedEngineParameters.getExecutorParameterMap()::remove);
+ schemaParamKeysToRemove.forEach(aggregatedEngineParameters.getContextParameters().getSchemaParameters()
+ .getSchemaHelperParameterMap()::remove);
+
+ final AxPolicyModel policyModel = main.getActivator().getPolicyModel();
+ final List<AxArtifactKey> keyInfoKeystoRemove = policyModel.getKeyInformation().getKeyInfoMap().keySet()
+ .stream().filter(key -> !keyInfoKeystoRetain.contains(key)).collect(Collectors.toList());
+ final List<AxArtifactKey> schemaKeystoRemove = policyModel.getSchemas().getSchemasMap().keySet().stream()
+ .filter(key -> !schemaKeystoRetain.contains(key)).collect(Collectors.toList());
+ final List<AxArtifactKey> eventKeystoRemove = policyModel.getEvents().getEventMap().keySet().stream()
+ .filter(key -> !eventKeystoRetain.contains(key)).collect(Collectors.toList());
+ final List<AxArtifactKey> albumKeystoRemove = policyModel.getAlbums().getAlbumsMap().keySet().stream()
+ .filter(key -> !albumKeystoRetain.contains(key)).collect(Collectors.toList());
+ final List<AxArtifactKey> taskKeystoRemove = policyModel.getTasks().getTaskMap().keySet().stream()
+ .filter(key -> !taskKeystoRetain.contains(key)).collect(Collectors.toList());
+ final List<AxArtifactKey> policyKeystoRemove = policyModel.getPolicies().getPolicyMap().keySet().stream()
+ .filter(key -> !policyKeystoRetain.contains(key)).collect(Collectors.toList());
+
+ final Map<AxArtifactKey, AxKeyInfo> keyInfoMap =
+ ModelService.getModel(AxKeyInformation.class).getKeyInfoMap();
+ final Map<AxArtifactKey, AxContextSchema> schemasMap =
+ ModelService.getModel(AxContextSchemas.class).getSchemasMap();
+ final Map<AxArtifactKey, AxEvent> eventMap = ModelService.getModel(AxEvents.class).getEventMap();
+ final Map<AxArtifactKey, AxContextAlbum> albumsMap =
+ ModelService.getModel(AxContextAlbums.class).getAlbumsMap();
+ final Map<AxArtifactKey, AxTask> taskMap = ModelService.getModel(AxTasks.class).getTaskMap();
+ final Map<AxArtifactKey, AxPolicy> policyMap = ModelService.getModel(AxPolicies.class).getPolicyMap();
+
+ keyInfoKeystoRemove.forEach(keyInfoMap::remove);
+ schemaKeystoRemove.forEach(schemasMap::remove);
+ eventKeystoRemove.forEach(eventMap::remove);
+ albumKeystoRemove.forEach(albumsMap::remove);
+ taskKeystoRemove.forEach(taskMap::remove);
+ policyKeystoRemove.forEach(policyMap::remove);
}
}
- private Map<ToscaConceptIdentifier, String[]> createPolicyArgsMap(List<ToscaPolicy> policies)
+ private Map<ToscaConceptIdentifier, ApexMain> initiateApexEngineForPolicies(List<ToscaPolicy> policies)
throws ApexStarterException {
- Map<ToscaConceptIdentifier, String[]> policyArgsMap = new LinkedHashMap<>();
+ Map<ToscaConceptIdentifier, ApexMain> mainMap = new LinkedHashMap<>();
for (ToscaPolicy policy : policies) {
String policyName = policy.getIdentifier().getName();
final StandardCoder standardCoder = new StandardCoder();
@@ -103,34 +242,38 @@ public class ApexEngineHandler {
throw new ApexStarterException(e);
}
final String[] apexArgs = {"-p", file.getAbsolutePath()};
- policyArgsMap.put(policy.getIdentifier(), apexArgs);
+ LOGGER.info("Starting apex engine for policy {}", policy.getIdentifier());
+ try {
+ ApexMain apexMain = new ApexMain(apexArgs);
+ mainMap.put(policy.getIdentifier(), apexMain);
+ } catch (Exception e) {
+ LOGGER.error("Execution of policy {} failed", policy.getIdentifier(), e);
+ }
}
- return policyArgsMap;
+ return mainMap;
}
/**
* Method to get the APEX engine statistics.
*/
public List<AxEngineModel> getEngineStats() {
- List<AxEngineModel> engineStats = null;
- if (null != apexMain && apexMain.isAlive()) {
- engineStats = apexMain.getEngineStats();
- }
- return engineStats;
+ // engineStats from all the apexMain instances running individual tosca policies are combined here.
+ return apexMainMap.values().stream().filter(apexMain -> (null != apexMain && apexMain.isAlive()))
+ .flatMap(m -> m.getEngineStats().stream()).collect(Collectors.toList());
}
/**
* Method to check whether the apex engine is running or not.
*/
public boolean isApexEngineRunning() {
- return null != apexMain && apexMain.isAlive();
+ return apexMainMap.values().stream().anyMatch(apexMain -> (null != apexMain && apexMain.isAlive()));
}
/**
* Method that return the list of running policies in the apex engine.
*/
public List<ToscaConceptIdentifier> getRunningPolicies() {
- return new ArrayList<>(apexMain.getApexParametersMap().keySet());
+ return new ArrayList<>(apexMainMap.keySet());
}
/**
@@ -139,8 +282,12 @@ public class ApexEngineHandler {
public void shutdown() throws ApexStarterException {
try {
LOGGER.debug("Shutting down apex engine.");
- apexMain.shutdown();
- apexMain = null;
+ for (ApexMain apexMain : apexMainMap.values()) {
+ apexMain.shutdown();
+ }
+ apexMainMap.clear();
+ ModelService.clear();
+ ParameterService.clear();
} catch (final ApexException e) {
throw new ApexStarterException(e);
}