aboutsummaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
authora.sreekumar <ajith.sreekumar@bell.ca>2020-06-30 17:50:04 +0100
committera.sreekumar <ajith.sreekumar@bell.ca>2020-07-01 12:35:51 +0100
commit9519d1257b4fe4fbb68d3e9ab155de8ff0f13052 (patch)
tree9030d2c297c0589bb4186842aeb6c74951a1018d /services
parentd0799660ee9714a33247d3be2d7841b1cedf75b8 (diff)
Fixing ConcurrentModificationException during multiple policy deployment in APEX
Change-Id: Ib39e798d733727bdc676755b66adf2c499e618af Issue-ID: POLICY-2655 Signed-off-by: a.sreekumar <ajith.sreekumar@bell.ca>
Diffstat (limited to 'services')
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java58
1 files changed, 34 insertions, 24 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
index 9b4176a01..4d84fa394 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
@@ -2,6 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2019-2020 Nordix Foundation.
+ * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,14 +23,19 @@
package org.onap.policy.apex.service.engine.main;
import java.io.IOException;
+import java.util.AbstractMap;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.stream.Stream;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException;
import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
import org.onap.policy.apex.model.basicmodel.service.ModelService;
import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
@@ -116,12 +122,13 @@ public class ApexActivator {
LOGGER.debug("Apex engine started as a service");
}
- private void setUpModelMarhsallerAndUnmarshaller(ApexParameters apexParameters) throws IOException, ApexException {
+ private void setUpModelMarhsallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException {
policyModelsMap = new LinkedHashMap<>();
Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
-
- for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) {
+ Set<Entry<ToscaPolicyIdentifier, ApexParameters>> apexParamsEntrySet =
+ new LinkedHashSet<>(apexParametersMap.entrySet());
+ apexParamsEntrySet.stream().forEach(apexParamsEntry -> {
ApexParameters apexParams = apexParamsEntry.getValue();
boolean duplicateInputParameterExist =
apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
@@ -131,7 +138,7 @@ public class ApexActivator {
LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.",
apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion());
apexParametersMap.remove(apexParamsEntry.getKey());
- continue;
+ return;
}
inputParametersMap.putAll(apexParams.getEventInputParameters());
outputParametersMap.putAll(apexParams.getEventOutputParameters());
@@ -139,14 +146,17 @@ public class ApexActivator {
if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) {
LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .",
apexParams.getEngineServiceParameters().getPolicyModelFileName());
-
- final String policyModelString =
- TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
- AxPolicyModel policyModel = EngineServiceImpl
- .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
- policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
+ try {
+ final String policyModelString = TextFileUtils
+ .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
+ AxPolicyModel policyModel = EngineServiceImpl
+ .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
+ policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
+ } catch (ApexException | IOException e) {
+ throw new ApexRuntimeException("Failed to create the apex model.", e);
+ }
}
- }
+ });
AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
// Set the policy model in the engine
@@ -164,26 +174,26 @@ public class ApexActivator {
}
private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) {
- Map.Entry<ToscaPolicyIdentifier, AxPolicyModel> firstEntry = policyModelsMap.entrySet().iterator().next();
- ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey());
- AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue());
- Stream<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelStream =
- policyModelsMap.entrySet().stream().skip(1);
- Entry<ToscaPolicyIdentifier, AxPolicyModel> finalPolicyModelEntry =
- policyModelStream.reduce(firstEntry, ((entry1, entry2) -> {
+ // Doing a deep copy so that original values in policyModelsMap is retained after reduction operation
+ Set<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelsEntries = policyModelsMap.entrySet().stream()
+ .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue())).collect(Collectors.toSet());
+ Optional<Entry<ToscaPolicyIdentifier, AxPolicyModel>> finalPolicyModelEntry =
+ policyModelsEntries.stream().reduce((entry1, entry2) -> {
try {
entry1.setValue(
PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
} catch (ApexModelException exc) {
- LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.",
+ LOGGER.error("Policy model for {} : {} has duplicates. So this policy is not executed.",
entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
apexParametersMap.remove(entry2.getKey());
policyModelsMap.remove(entry2.getKey());
}
return entry1;
- }));
- AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue());
- policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap
+ });
+ AxPolicyModel finalPolicyModel = null;
+ if (finalPolicyModelEntry.isPresent()) {
+ finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.get().getValue());
+ }
return finalPolicyModel;
}
@@ -233,7 +243,7 @@ public class ApexActivator {
* Set up unmarshaler/marshaler pairing for synchronized event handling. We only need to traverse the unmarshalers
* because the unmarshalers and marshalers are paired one to one uniquely so if we find a synchronized unmarshaler
* we'll also find its paired marshaler
- *
+ *
* @param inputParametersMap the apex parameters
*/
private void setUpMarshalerPairings(Map<String, EventHandlerParameters> inputParametersMap) {