aboutsummaryrefslogtreecommitdiffstats
path: root/services/services-engine/src/main/java/org/onap
diff options
context:
space:
mode:
authora.sreekumar <ajith.sreekumar@bell.ca>2021-01-25 11:03:42 +0000
committera.sreekumar <ajith.sreekumar@bell.ca>2021-01-28 13:14:51 +0000
commit658e67bc821a3bc55f2c6d877e7e0baa21427333 (patch)
tree5ab53c6026317858aec8b61bf0aec37459247cf5 /services/services-engine/src/main/java/org/onap
parent84f92b44e70ce27bb4213da677d50ac91169432c (diff)
Improve handling of multiple policy in APEX PDP
Change-Id: Ic4adf5bd8876dc31fc93993298e90389baaa2c39 Issue-ID: POLICY-2883 Signed-off-by: a.sreekumar <ajith.sreekumar@bell.ca>
Diffstat (limited to 'services/services-engine/src/main/java/org/onap')
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java278
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java13
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java175
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java4
4 files changed, 182 insertions, 288 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
index cadcc7ee8..3451c120c 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2019-2021 Nordix Foundation.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,35 +22,41 @@
package org.onap.policy.apex.service.engine.main;
-import java.util.AbstractMap;
-import java.util.ArrayList;
import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException;
import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
+import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInfo;
+import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInformation;
import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
import org.onap.policy.apex.model.basicmodel.service.ModelService;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchemas;
import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
+import org.onap.policy.apex.model.eventmodel.concepts.AxEvent;
+import org.onap.policy.apex.model.eventmodel.concepts.AxEvents;
+import org.onap.policy.apex.model.policymodel.concepts.AxPolicies;
+import org.onap.policy.apex.model.policymodel.concepts.AxPolicy;
import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
+import org.onap.policy.apex.model.policymodel.concepts.AxTask;
+import org.onap.policy.apex.model.policymodel.concepts.AxTasks;
import org.onap.policy.apex.model.policymodel.handling.PolicyModelMerger;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.runtime.EngineService;
import org.onap.policy.apex.service.engine.runtime.impl.EngineServiceImpl;
+import org.onap.policy.apex.service.parameters.ApexParameterConstants;
import org.onap.policy.apex.service.parameters.ApexParameters;
import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
import org.onap.policy.common.parameters.ParameterService;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
@@ -66,13 +72,12 @@ public class ApexActivator {
// The logger for this class
private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexActivator.class);
- // The parameters of the Apex activator when running with multiple policies
@Getter
@Setter
- private Map<ToscaConceptIdentifier, ApexParameters> apexParametersMap;
+ private ApexParameters apexParameters;
@Getter
- Map<ToscaConceptIdentifier, AxPolicyModel> policyModelsMap;
+ private AxPolicyModel policyModel;
// Event unmarshalers are used to receive events asynchronously into Apex
private final Map<String, ApexEventUnmarshaller> unmarshallerMap = new LinkedHashMap<>();
@@ -93,10 +98,10 @@ public class ApexActivator {
/**
* Instantiate the activator for the Apex engine as a complete service.
*
- * @param parametersMap the apex parameters map for the Apex service
+ * @param parameters the apex parameters for the Apex service
*/
- public ApexActivator(Map<ToscaConceptIdentifier, ApexParameters> parametersMap) {
- apexParametersMap = parametersMap;
+ public ApexActivator(ApexParameters parameters) {
+ apexParameters = parameters;
}
/**
@@ -108,96 +113,131 @@ public class ApexActivator {
LOGGER.debug("Apex engine starting as a service . . .");
try {
- ApexParameters apexParameters = apexParametersMap.values().iterator().next();
- // totalInstanceCount is the sum of instance counts required as per each policy
- int totalInstanceCount = apexParametersMap.values().stream()
- .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
- apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount);
- engineKey = apexParameters.getEngineServiceParameters().getEngineKey();
instantiateEngine(apexParameters);
setUpModelMarshallerAndUnmarshaller(apexParameters);
} catch (final Exception e) {
- LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
+ try {
+ terminate();
+ } catch (ApexException e1) {
+ LOGGER.error("Terminating the ApexActivator encountered error", e1);
+ }
throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
}
LOGGER.debug("Apex engine started as a service");
}
- private void setUpModelMarshallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException {
- policyModelsMap = new LinkedHashMap<>();
- Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
- Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
- Set<Entry<ToscaConceptIdentifier, ApexParameters>> apexParamsEntrySet = new LinkedHashSet<>(
- apexParametersMap.entrySet());
- apexParamsEntrySet.stream().forEach(apexParamsEntry -> {
- ApexParameters apexParams = apexParamsEntry.getValue();
- List<String> duplicateInputParameters = new ArrayList<>(apexParams.getEventInputParameters().keySet());
- duplicateInputParameters.retainAll(inputParametersMap.keySet());
- List<String> duplicateOutputParameters = new ArrayList<>(apexParams.getEventOutputParameters().keySet());
- duplicateOutputParameters.retainAll(outputParametersMap.keySet());
-
- if (!(duplicateInputParameters.isEmpty() && duplicateOutputParameters.isEmpty())) {
- LOGGER.error("I/O Parameters {}/{} for {}:{} are duplicates. So this policy is not executed.",
- duplicateInputParameters, duplicateOutputParameters, apexParamsEntry.getKey().getName(),
- apexParamsEntry.getKey().getVersion());
- apexParametersMap.remove(apexParamsEntry.getKey());
- return;
- }
- inputParametersMap.putAll(apexParams.getEventInputParameters());
- outputParametersMap.putAll(apexParams.getEventOutputParameters());
- // Check if a policy model file has been specified
- if (apexParams.getEngineServiceParameters().getPolicyModel() != null) {
- LOGGER.debug("deploying policy model to the apex engines . . .");
- try {
- final String policyModelString = apexParams.getEngineServiceParameters().getPolicyModel();
- AxPolicyModel policyModel = EngineServiceImpl.createModel(engineKey, policyModelString);
- policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
- } catch (ApexException e) {
- throw new ApexRuntimeException("Failed to create the apex model.", e);
- }
- }
- });
- AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
+ private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
+ if (null != apexEngineService && apexEngineService.getKey().equals(engineKey)) {
+ throw new ApexException("Apex Engine already initialized.");
+ }
+ // Create engine with specified thread count
+ LOGGER.debug("starting apex engine service . . .");
+ apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters());
+
+ // Create the engine holder to hold the engine's references and act as an event
+ // receiver
+ engineServiceHandler = new ApexEngineServiceHandler(apexEngineService);
+ }
+
+ private void setUpModelMarshallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException {
+ AxPolicyModel model;
+ try {
+ final String policyModelString = apexParameters.getEngineServiceParameters().getPolicyModel();
+ model = EngineServiceImpl.createModel(apexParameters.getEngineServiceParameters().getEngineKey(),
+ policyModelString);
+ } catch (ApexException e) {
+ throw new ApexRuntimeException("Failed to create the apex model.", e);
+ }
+ AxKeyInformation existingKeyInformation = null;
+ AxContextSchemas existingSchemas = null;
+ AxEvents existingEvents = null;
+ AxContextAlbums existingAlbums = null;
+ AxTasks existingTasks = null;
+ AxPolicies existingPolicies = null;
+ if (ModelService.existsModel(AxPolicyModel.class)) {
+ existingKeyInformation = new AxKeyInformation(ModelService.getModel(AxKeyInformation.class));
+ existingSchemas = new AxContextSchemas(ModelService.getModel(AxContextSchemas.class));
+ existingEvents = new AxEvents(ModelService.getModel(AxEvents.class));
+ existingAlbums = new AxContextAlbums(ModelService.getModel(AxContextAlbums.class));
+ existingTasks = new AxTasks(ModelService.getModel(AxTasks.class));
+ existingPolicies = new AxPolicies(ModelService.getModel(AxPolicies.class));
+ }
// Set the policy model in the engine
- apexEngineService.updateModel(engineKey, finalPolicyModel, true);
+ try {
+ apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), model, true);
+ policyModel = new AxPolicyModel(model);
+ } catch (ApexException exp) {
+ // Discard concepts from the current policy
+ if (null != existingKeyInformation) {
+ // Update model service with other policies' concepts.
+ ModelService.registerModel(AxKeyInformation.class, existingKeyInformation);
+ ModelService.registerModel(AxContextSchemas.class, existingSchemas);
+ ModelService.registerModel(AxEvents.class, existingEvents);
+ ModelService.registerModel(AxContextAlbums.class, existingAlbums);
+ ModelService.registerModel(AxTasks.class, existingTasks);
+ ModelService.registerModel(AxPolicies.class, existingPolicies);
+ } else {
+ ModelService.clear();
+ }
+ throw exp;
+ }
+ if (null != existingKeyInformation) {
+ // Make sure all concepts in previously deployed policies are retained in ModelService
+ // during multi policy deployment.
+ updateModelService(existingKeyInformation, existingSchemas, existingEvents, existingAlbums, existingTasks,
+ existingPolicies);
+ }
- handleExistingMarshallerAndUnmarshaller(inputParametersMap, outputParametersMap);
- setUpNewMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
- outputParametersMap);
+ setUpNewMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(),
+ apexParameters.getEventInputParameters(), apexParameters.getEventOutputParameters());
// Wire up pairings between marhsallers and unmarshallers
- setUpMarshalerPairings(inputParametersMap);
+ setUpMarshalerPairings(apexParameters.getEventInputParameters());
// Start event processing
- startUnmarshallers(inputParametersMap);
+ startUnmarshallers(apexParameters.getEventInputParameters());
}
- private AxPolicyModel aggregatePolicyModels(Map<ToscaConceptIdentifier, AxPolicyModel> policyModelsMap) {
- // Doing a deep copy so that original values in policyModelsMap is retained
- // after reduction operation
- Set<Entry<ToscaConceptIdentifier, AxPolicyModel>> policyModelsEntries = policyModelsMap.entrySet().stream()
- .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue())).collect(Collectors.toSet());
- Optional<Entry<ToscaConceptIdentifier, AxPolicyModel>> finalPolicyModelEntry = policyModelsEntries.stream()
- .reduce((entry1, entry2) -> {
- try {
- entry1.setValue(
- PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
- } catch (ApexModelException exc) {
- LOGGER.error("Policy model for {} : {} has duplicates. So this policy is not executed.",
- entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
- apexParametersMap.remove(entry2.getKey());
- policyModelsMap.remove(entry2.getKey());
- }
- return entry1;
- });
- AxPolicyModel finalPolicyModel = null;
- if (finalPolicyModelEntry.isPresent()) {
- finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.get().getValue());
+ private void updateModelService(AxKeyInformation existingKeyInformation,
+ AxContextSchemas existingSchemas, AxEvents existingEvents,
+ AxContextAlbums existingAlbums, AxTasks existingTasks,
+ AxPolicies existingPolicies) throws ApexModelException {
+
+ AxContextSchemas axContextSchemas = ModelService.getModel(AxContextSchemas.class);
+ AxEvents axEvents = ModelService.getModel(AxEvents.class);
+ AxContextAlbums axContextAlbums = ModelService.getModel(AxContextAlbums.class);
+ AxTasks axTasks = ModelService.getModel(AxTasks.class);
+ AxPolicies axPolicies = ModelService.getModel(AxPolicies.class);
+
+ Map<AxArtifactKey, AxContextSchema> newSchemasMap = axContextSchemas.getSchemasMap();
+ Map<AxArtifactKey, AxEvent> newEventsMap = axEvents.getEventMap();
+ Map<AxArtifactKey, AxContextAlbum> newAlbumsMap = axContextAlbums.getAlbumsMap();
+ Map<AxArtifactKey, AxTask> newTasksMap = axTasks.getTaskMap();
+ Map<AxArtifactKey, AxPolicy> newPoliciesMap = axPolicies.getPolicyMap();
+
+ StringBuilder errorMessage = new StringBuilder();
+ PolicyModelMerger.checkForDuplicateItem(existingSchemas.getSchemasMap(), newSchemasMap, errorMessage, "schema");
+ PolicyModelMerger.checkForDuplicateItem(existingEvents.getEventMap(), newEventsMap, errorMessage, "event");
+ PolicyModelMerger.checkForDuplicateItem(existingAlbums.getAlbumsMap(), newAlbumsMap, errorMessage, "album");
+ PolicyModelMerger.checkForDuplicateItem(existingTasks.getTaskMap(), newTasksMap, errorMessage, "task");
+ PolicyModelMerger.checkForDuplicateItem(existingPolicies.getPolicyMap(), newPoliciesMap, errorMessage,
+ "policy");
+ if (errorMessage.length() > 0) {
+ throw new ApexModelException(errorMessage.toString());
}
- return finalPolicyModel;
+
+ AxKeyInformation axKeyInformation = ModelService.getModel(AxKeyInformation.class);
+ Map<AxArtifactKey, AxKeyInfo> newKeyInfoMap = axKeyInformation.getKeyInfoMap();
+ // Now add all the concepts that must be copied over
+ newKeyInfoMap.putAll(existingKeyInformation.getKeyInfoMap());
+ newSchemasMap.putAll(existingSchemas.getSchemasMap());
+ newEventsMap.putAll(existingEvents.getEventMap());
+ newAlbumsMap.putAll(existingAlbums.getAlbumsMap());
+ newTasksMap.putAll(existingTasks.getTaskMap());
+ newPoliciesMap.putAll(existingPolicies.getPolicyMap());
}
private void setUpNewMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
@@ -226,38 +266,6 @@ public class ApexActivator {
}
}
- private void handleExistingMarshallerAndUnmarshaller(Map<String, EventHandlerParameters> inputParametersMap,
- Map<String, EventHandlerParameters> outputParametersMap) {
- // stop and remove any marshaller/unmarshaller that is part of a policy that is
- // undeployed
- marshallerMap.entrySet().stream()
- .filter(marshallerEntry -> !outputParametersMap.containsKey(marshallerEntry.getKey()))
- .forEach(marshallerEntry -> marshallerEntry.getValue().stop());
- marshallerMap.keySet().removeIf(marshallerKey -> !outputParametersMap.containsKey(marshallerKey));
- unmarshallerMap.entrySet().stream()
- .filter(unmarshallerEntry -> !inputParametersMap.containsKey(unmarshallerEntry.getKey()))
- .forEach(unmarshallerEntry -> unmarshallerEntry.getValue().stop());
- unmarshallerMap.keySet().removeIf(unmarshallerKey -> !inputParametersMap.containsKey(unmarshallerKey));
-
- // If a marshaller/unmarshaller is already initialized, they don't need to be
- // reinitialized during model update.
- outputParametersMap.keySet().removeIf(marshallerMap::containsKey);
- inputParametersMap.keySet().removeIf(unmarshallerMap::containsKey);
- }
-
- private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
- if (null != apexEngineService && apexEngineService.getKey().equals(engineKey)) {
- throw new ApexException("Apex Engine already initialized.");
- }
- // Create engine with specified thread count
- LOGGER.debug("starting apex engine service . . .");
- apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters());
-
- // Create the engine holder to hold the engine's references and act as an event
- // receiver
- engineServiceHandler = new ApexEngineServiceHandler(apexEngineService);
- }
-
/**
* Set up unmarshaler/marshaler pairing for synchronized event handling. We only
* need to traverse the unmarshalers because the unmarshalers and marshalers are
@@ -275,8 +283,8 @@ public class ApexActivator {
// Check if the unmarshaler is synchronized with a marshaler
if (inputParameters.getValue().isPeeredMode(peeredMode)) {
// Find the unmarshaler and marshaler
- final ApexEventMarshaller peeredMarshaler = marshallerMap
- .get(inputParameters.getValue().getPeer(peeredMode));
+ final ApexEventMarshaller peeredMarshaler =
+ marshallerMap.get(inputParameters.getValue().getPeer(peeredMode));
// Connect the unmarshaler and marshaler
unmarshaller.connectMarshaler(peeredMode, peeredMarshaler);
@@ -298,22 +306,6 @@ public class ApexActivator {
}
/**
- * Updates the APEX Engine with the model created from new Policies.
- *
- * @param apexParamsMap the apex parameters map for the Apex service
- * @throws ApexException on errors
- */
- public void updateModel(Map<ToscaConceptIdentifier, ApexParameters> apexParamsMap) throws ApexException {
- try {
- ApexParameters apexParameters = apexParamsMap.values().iterator().next();
- setUpModelMarshallerAndUnmarshaller(apexParameters);
- } catch (final Exception e) {
- LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
- throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
- }
- }
-
- /**
* Get the Apex engine worker stats.
*/
public List<AxEngineModel> getEngineStats() {
@@ -338,19 +330,25 @@ public class ApexActivator {
engineServiceHandler.terminate();
engineServiceHandler = null;
}
-
- // Clear the services
- ModelService.clear();
- ParameterService.clear();
+ // Clear the services in case if this was the only policy running in the engine
+ ApexParameters apexParams = null;
+ if (ParameterService.contains(ApexParameterConstants.MAIN_GROUP_NAME)) {
+ apexParams = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME);
+ }
+ if (null != apexParams
+ && apexParameters.getEventInputParameters().size() == apexParams.getEventInputParameters().size()) {
+ ModelService.clear();
+ ParameterService.clear();
+ }
}
/**
* Shuts down all marshallers and unmarshallers.
*/
private void shutdownMarshallerAndUnmarshaller() {
- marshallerMap.values().forEach(ApexEventMarshaller::stop);
- marshallerMap.clear();
unmarshallerMap.values().forEach(ApexEventUnmarshaller::stop);
unmarshallerMap.clear();
+ marshallerMap.values().forEach(ApexEventMarshaller::stop);
+ marshallerMap.clear();
}
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java
index fd6ac4489..fdc404c67 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2019-2020 Nordix Foundation.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -347,17 +347,16 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
// Order the stop
stopOrderedFlag = true;
- // Order a stop on the synchronous cache if one exists
- if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
- && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
- ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
- }
-
// Wait for thread shutdown
while (unmarshallerThread != null && unmarshallerThread.isAlive()) {
ThreadUtilities.sleep(UNMARSHALLER_SHUTDOWN_WAIT_INTERVAL);
}
+ // Order a stop on the synchronous cache if one exists
+ if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
+ && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
+ ((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
+ }
LOGGER.exit("shut down Apex event unmarshaller");
}
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java
index 5707c95ad..65c2acffa 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* Modification Copyright (C) 2019-2021 Nordix Foundation.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,26 +24,20 @@ package org.onap.policy.apex.service.engine.main;
import java.util.Arrays;
import java.util.Base64;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
-import java.util.TreeMap;
+import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
import org.onap.policy.apex.core.engine.EngineParameters;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
-import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
-import org.onap.policy.apex.model.basicmodel.service.ModelService;
-import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
-import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
-import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
+import org.onap.policy.apex.service.parameters.ApexParameterConstants;
import org.onap.policy.apex.service.parameters.ApexParameterHandler;
import org.onap.policy.apex.service.parameters.ApexParameters;
+import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
import org.onap.policy.common.parameters.ParameterService;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
@@ -58,14 +52,12 @@ public class ApexMain {
private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexMain.class);
// The Apex Activator that activates the Apex engine
+ @Getter
private ApexActivator activator;
// The parameters read in from JSON for each policy
@Getter
- private Map<ToscaConceptIdentifier, ApexParameters> apexParametersMap;
-
- //engineParameters are aggregated in case of multiple policies
- private EngineParameters aggregatedEngineParameters;
+ private ApexParameters apexParameters;
private ApexParameterHandler apexParameterHandler = new ApexParameterHandler();
@@ -77,73 +69,27 @@ public class ApexMain {
* Instantiates the Apex service.
*
* @param args the command line arguments
+ * @throws ApexException the apex exception.
*/
- public ApexMain(final String[] args) {
+ public ApexMain(final String[] args) throws ApexException {
LOGGER.entry("Starting Apex service with parameters " + Arrays.toString(args) + " . . .");
- apexParametersMap = new LinkedHashMap<>();
- aggregatedEngineParameters = new EngineParameters();
try {
- apexParametersMap.put(new ToscaConceptIdentifier(), populateApexParameters(args));
+ apexParameters = populateApexParameters(args);
} catch (ApexException e) {
LOGGER.error(APEX_SERVICE_FAILED_MSG, e);
return;
}
- apexParameterHandler.registerParameters(apexParametersMap.values().iterator().next());
- // Now, create the activator for the Apex service
- activator = new ApexActivator(apexParametersMap);
-
- // Start the activator
- try {
- activator.initialize();
- setAlive(true);
- } catch (final ApexActivatorException e) {
- LOGGER.error("start of Apex service failed, used parameters are " + Arrays.toString(args), e);
- return;
- }
-
- // Add a shutdown hook to shut everything down in an orderly manner
- Runtime.getRuntime().addShutdownHook(new ApexMainShutdownHookClass());
- LOGGER.exit("Started Apex");
- }
-
- /**
- * Instantiates the Apex service for multiple policies.
- *
- * @param policyArgumentsMap the map with command line arguments as value and policy-id as key
- * @throws ApexException on errors
- */
- public ApexMain(Map<ToscaConceptIdentifier, String[]> policyArgumentsMap) throws ApexException {
- apexParametersMap = new LinkedHashMap<>();
- aggregatedEngineParameters = new EngineParameters();
- for (Entry<ToscaConceptIdentifier, String[]> policyArgsEntry: policyArgumentsMap.entrySet()) {
- try {
- apexParametersMap.put(policyArgsEntry.getKey(), populateApexParameters(policyArgsEntry.getValue()));
- } catch (ApexException e) {
- LOGGER.error("Invalid arguments specified for policy - " + policyArgsEntry.getKey().getName() + ":"
- + policyArgsEntry.getKey().getVersion(), e);
- }
- }
- if (apexParametersMap.isEmpty()) {
- LOGGER.error(APEX_SERVICE_FAILED_MSG);
- return;
- }
+ aggregateParametersAndRegister();
- // Set the aggregated engineParameters which will be used later while creating the engine
- ApexParameters apexParameters = apexParametersMap.values().iterator().next();
- apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters);
- apexParameterHandler.registerParameters(apexParameters);
// Now, create the activator for the Apex service
- activator = new ApexActivator(apexParametersMap);
+ activator = new ApexActivator(apexParameters);
// Start the activator
try {
activator.initialize();
- apexParametersMap = activator.getApexParametersMap();
setAlive(true);
} catch (final ApexActivatorException e) {
- LOGGER.error(APEX_SERVICE_FAILED_MSG, e);
- activator.terminate();
- return;
+ throw new ApexException("start of Apex service failed, used parameters are " + Arrays.toString(args), e);
}
// Add a shutdown hook to shut everything down in an orderly manner
@@ -151,75 +97,6 @@ public class ApexMain {
LOGGER.exit("Started Apex");
}
- /**
- * Updates the APEX Engine with the model created from new Policies.
- *
- * @param policyArgsMap the map with command line arguments as value and policy-id as key
- * @throws ApexException on errors
- */
- public void updateModel(Map<ToscaConceptIdentifier, String[]> policyArgsMap) throws ApexException {
- // flag that determines if any policy received in PDP_UPDATE is already deployed in the engine
- boolean isAnyPolicyDeployed = policyArgsMap.keySet().stream().anyMatch(apexParametersMap::containsKey);
- apexParametersMap.clear();
- aggregatedEngineParameters = new EngineParameters();
- AxContextAlbums albums = ModelService.getModel(AxContextAlbums.class);
- Map<AxArtifactKey, AxContextAlbum> albumsMap = new TreeMap<>();
- for (Entry<ToscaConceptIdentifier, String[]> policyArgsEntry : policyArgsMap.entrySet()) {
- findAlbumsToHold(albumsMap, policyArgsEntry.getKey());
- try {
- apexParametersMap.put(policyArgsEntry.getKey(), populateApexParameters(policyArgsEntry.getValue()));
- } catch (ApexException e) {
- LOGGER.error("Invalid arguments specified for policy - {}:{}", policyArgsEntry.getKey().getName(),
- policyArgsEntry.getKey().getVersion(), e);
- }
- }
- // Set the aggregated engineParameters
- ApexParameters apexParameters = apexParametersMap.values().iterator().next();
- apexParameters.getEngineServiceParameters().setEngineParameters(aggregatedEngineParameters);
- ParameterService.clear();
- apexParameterHandler.registerParameters(apexParameters);
- try {
- if (albumsMap.isEmpty() && !isAnyPolicyDeployed) {
- // clear context since none of the policies' context albums has to be retained
- // this could be because all policies have a major version change,
- // or a full set of new policies are received in the update message
- activator.terminate();
- // ParameterService is cleared when activator is terminated. Register the parameters again in this case
- apexParameterHandler.registerParameters(apexParameters);
- activator = new ApexActivator(apexParametersMap);
- activator.initialize();
- setAlive(true);
- } else {
- albums.setAlbumsMap(albumsMap);
- activator.setApexParametersMap(apexParametersMap);
- activator.updateModel(apexParametersMap);
- }
- } catch (final ApexException e) {
- LOGGER.error(APEX_SERVICE_FAILED_MSG, e);
- activator.terminate();
- throw new ApexException(e.getMessage());
- }
- }
-
- /**
- * Find the context albums which should be retained when updating the policies.
- *
- * @param albumsMap the albums which should be kept during model update
- * @param policyId the policy id of current policy
- */
- private void findAlbumsToHold(Map<AxArtifactKey, AxContextAlbum> albumsMap, ToscaConceptIdentifier policyId) {
- for (Entry<ToscaConceptIdentifier, AxPolicyModel> policyModelsEntry : activator.getPolicyModelsMap()
- .entrySet()) {
- // If a policy with the same major version is received in PDP_UPDATE,
- // context for that policy has to be retained. For this, take such policies' albums
- if (policyModelsEntry.getKey().getName().equals(policyId.getName())
- && policyModelsEntry.getKey().getVersion().split("\\.")[0]
- .equals(policyId.getVersion().split("\\.")[0])) {
- albumsMap.putAll(policyModelsEntry.getValue().getAlbums().getAlbumsMap());
- }
- }
- }
-
private ApexParameters populateApexParameters(String[] args) throws ApexException {
// Check the arguments
final ApexCommandLineArguments arguments = new ApexCommandLineArguments();
@@ -263,11 +140,32 @@ public class ApexMain {
ehParameterEntry.getValue().setName(ehParameterEntry.getKey());
}
}
- aggregateEngineParameters(axParameters.getEngineServiceParameters().getEngineParameters());
return axParameters;
}
- private void aggregateEngineParameters(EngineParameters engineParameters) {
+ private void aggregateParametersAndRegister() throws ApexException {
+ ApexParameters aggregatedParameters = null;
+ if (ParameterService.contains(ApexParameterConstants.MAIN_GROUP_NAME)) {
+ aggregatedParameters = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME);
+ } else {
+ aggregatedParameters = new ApexParameters();
+ aggregatedParameters.setEngineServiceParameters(new EngineServiceParameters());
+ apexParameterHandler.registerParameters(aggregatedParameters);
+ }
+ List<String> duplicateInputParameters = aggregatedParameters.getEventInputParameters().keySet().stream()
+ .filter(apexParameters.getEventInputParameters()::containsKey).collect(Collectors.toList());
+ List<String> duplicateOutputParameters = aggregatedParameters.getEventOutputParameters().keySet().stream()
+ .filter(apexParameters.getEventOutputParameters()::containsKey).collect(Collectors.toList());
+ if (!(duplicateInputParameters.isEmpty() && duplicateOutputParameters.isEmpty())) {
+ throw new ApexException(
+ "start of Apex service failed because this policy has the following duplicate I/O parameters: "
+ + duplicateInputParameters + "/" + duplicateOutputParameters);
+ }
+ aggregatedParameters.getEventInputParameters().putAll(apexParameters.getEventInputParameters());
+ aggregatedParameters.getEventOutputParameters().putAll(apexParameters.getEventOutputParameters());
+ EngineParameters aggregatedEngineParameters =
+ aggregatedParameters.getEngineServiceParameters().getEngineParameters();
+ EngineParameters engineParameters = apexParameters.getEngineServiceParameters().getEngineParameters();
aggregatedEngineParameters.getTaskParameters().addAll(engineParameters.getTaskParameters());
aggregatedEngineParameters.getExecutorParameterMap().putAll(engineParameters.getExecutorParameterMap());
aggregatedEngineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap()
@@ -346,8 +244,9 @@ public class ApexMain {
* The main method.
*
* @param args the arguments
+ * @throws ApexException the apex exception.
*/
- public static void main(final String[] args) {
+ public static void main(final String[] args) throws ApexException {
new ApexMain(args);
}
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java
index 9a07b4ad5..673d9cab5 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2019 Nordix Foundation.
- * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -73,8 +73,6 @@ public class ApexParameterHandler {
*/
public ApexParameters getParameters(final ApexCommandLineArguments arguments) throws ParameterException {
- ParameterService.clear();
-
ApexParameters parameters = null;
String toscaPolicyFilePath = arguments.getToscaPolicyFilePath();
// Read the parameters