aboutsummaryrefslogtreecommitdiffstats
path: root/services/services-engine
diff options
context:
space:
mode:
authora.sreekumar <ajith.sreekumar@est.tech>2019-11-05 14:37:09 +0000
committera.sreekumar <ajith.sreekumar@est.tech>2019-11-14 12:15:55 +0000
commit40a1f22ff8d28e78b6512c0a10d454b37f015fdb (patch)
tree8d92efb03c45615bf8697db4adcee558a80eb9e7 /services/services-engine
parent8c95a09fd412c89b1eaf7d0658005ffba24025bd (diff)
Retaining context in APEX Engine based on policies received in PdpUpdate
Change-Id: I73fad5bf76ed6b4979f5ab76013f204ea82da30b Issue-ID: POLICY-2215 Signed-off-by: a.sreekumar <ajith.sreekumar@est.tech>
Diffstat (limited to 'services/services-engine')
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java137
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java67
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java20
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java17
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java4
5 files changed, 176 insertions, 69 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
index 2c3fac151..6c86c1eff 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
@@ -21,11 +21,13 @@
package org.onap.policy.apex.service.engine.main;
+import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Stream;
import lombok.Getter;
+import lombok.Setter;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
import org.onap.policy.apex.model.basicmodel.service.ModelService;
@@ -59,8 +61,12 @@ public class ApexActivator {
// The parameters of the Apex activator when running with multiple policies
@Getter
+ @Setter
private Map<ToscaPolicyIdentifier, ApexParameters> apexParametersMap;
+ @Getter
+ Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap;
+
// Event unmarshalers are used to receive events asynchronously into Apex
private final Map<String, ApexEventUnmarshaller> unmarshallerMap = new LinkedHashMap<>();
@@ -99,46 +105,7 @@ public class ApexActivator {
.mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount);
instantiateEngine(apexParameters);
- Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap = new LinkedHashMap<>();
- Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
- Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
-
- for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) {
- ApexParameters apexParams = apexParamsEntry.getValue();
- boolean duplicateInputParameterExist =
- apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
- boolean duplicateOutputParameterExist =
- apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey);
- if (duplicateInputParameterExist || duplicateOutputParameterExist) {
- LOGGER.error("I/O Parameters for " + apexParamsEntry.getKey().getName() + ":"
- + apexParamsEntry.getKey().getVersion()
- + " has duplicates. So this policy is not executed");
- apexParametersMap.remove(apexParamsEntry.getKey());
- continue;
- } else {
- inputParametersMap.putAll(apexParams.getEventInputParameters());
- outputParametersMap.putAll(apexParams.getEventOutputParameters());
- }
- // Check if a policy model file has been specified
- if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) {
- LOGGER.debug("deploying policy model in \""
- + apexParams.getEngineServiceParameters().getPolicyModelFileName()
- + "\" to the apex engines . . .");
-
- final String policyModelString = TextFileUtils
- .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
- AxPolicyModel policyModel = EngineServiceImpl
- .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
- policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
- }
- }
- AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
- // Set the policy model in the engine
- apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(),
- finalPolicyModel, true);
- setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
- outputParametersMap);
- setUpmarshalerPairings(inputParametersMap);
+ setUpModelMarhsallerAndUnmarshaller(apexParameters);
} catch (final Exception e) {
LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
@@ -147,8 +114,50 @@ public class ApexActivator {
LOGGER.debug("Apex engine started as a service");
}
+ private void setUpModelMarhsallerAndUnmarshaller(ApexParameters apexParameters) throws IOException, ApexException {
+ policyModelsMap = new LinkedHashMap<>();
+ Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
+ Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
+
+ for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) {
+ ApexParameters apexParams = apexParamsEntry.getValue();
+ boolean duplicateInputParameterExist =
+ apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
+ boolean duplicateOutputParameterExist =
+ apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey);
+ if (duplicateInputParameterExist || duplicateOutputParameterExist) {
+ LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.",
+ apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion());
+ apexParametersMap.remove(apexParamsEntry.getKey());
+ continue;
+ }
+ inputParametersMap.putAll(apexParams.getEventInputParameters());
+ outputParametersMap.putAll(apexParams.getEventOutputParameters());
+ // Check if a policy model file has been specified
+ if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) {
+ LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .",
+ apexParams.getEngineServiceParameters().getPolicyModelFileName());
+
+ final String policyModelString =
+ TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
+ AxPolicyModel policyModel = EngineServiceImpl
+ .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
+ policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
+ }
+ }
+ AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
+ // Set the policy model in the engine
+ apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel,
+ true);
+ setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
+ outputParametersMap);
+ setUpMarshalerPairings(inputParametersMap);
+ }
+
private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) {
Map.Entry<ToscaPolicyIdentifier, AxPolicyModel> firstEntry = policyModelsMap.entrySet().iterator().next();
+ ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey());
+ AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue());
Stream<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelStream =
policyModelsMap.entrySet().stream().skip(1);
Entry<ToscaPolicyIdentifier, AxPolicyModel> finalPolicyModelEntry =
@@ -157,13 +166,16 @@ public class ApexActivator {
entry1.setValue(
PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
} catch (ApexModelException exc) {
- LOGGER.error("Policy model for " + entry2.getKey().getName() + ":" + entry2.getKey().getVersion()
- + " is having duplicates. So this policy is not executed", exc.getMessage());
+ LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.",
+ entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
apexParametersMap.remove(entry2.getKey());
+ policyModelsMap.remove(entry2.getKey());
}
return entry1;
}));
- return finalPolicyModelEntry.getValue();
+ AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue());
+ policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap
+ return finalPolicyModel;
}
private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
@@ -215,7 +227,7 @@ public class ApexActivator {
* paired marshaler
* @param inputParametersMap the apex parameters
*/
- private void setUpmarshalerPairings(Map<String, EventHandlerParameters> inputParametersMap) {
+ private void setUpMarshalerPairings(Map<String, EventHandlerParameters> inputParametersMap) {
for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
final ApexEventUnmarshaller unmarshaller = unmarshallerMap.get(inputParameters.getKey());
@@ -237,21 +249,30 @@ public class ApexActivator {
}
/**
+ * Updates the APEX Engine with the model created from new Policies.
+ *
+ * @param apexParamsMap the apex parameters map for the Apex service
+ * @throws ApexException on errors
+ */
+ public void updateModel(Map<ToscaPolicyIdentifier, ApexParameters> apexParamsMap) throws ApexException {
+ try {
+ shutdownMarshallerAndUnmarshaller();
+ ApexParameters apexParameters = apexParamsMap.values().iterator().next();
+ setUpModelMarhsallerAndUnmarshaller(apexParameters);
+ } catch (final Exception e) {
+ LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
+ throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
+ }
+ }
+
+ /**
* Terminate the Apex engine.
*
* @throws ApexException on termination errors
*/
public void terminate() throws ApexException {
// Shut down all marshalers and unmarshalers
- for (final ApexEventMarshaller marshaller : marshallerMap.values()) {
- marshaller.stop();
- }
- marshallerMap.clear();
-
- for (final ApexEventUnmarshaller unmarshaller : unmarshallerMap.values()) {
- unmarshaller.stop();
- }
- unmarshallerMap.clear();
+ shutdownMarshallerAndUnmarshaller();
// Check if the engine service handler has been shut down already
if (engineServiceHandler != null) {
@@ -263,4 +284,14 @@ public class ApexActivator {
ModelService.clear();
ParameterService.clear();
}
+
+ /**
+ * Shuts down all marshallers and unmarshallers.
+ */
+ private void shutdownMarshallerAndUnmarshaller() {
+ marshallerMap.values().forEach(ApexEventMarshaller::stop);
+ marshallerMap.clear();
+ unmarshallerMap.values().forEach(ApexEventUnmarshaller::stop);
+ unmarshallerMap.clear();
+ }
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java
index 4600690e7..14b57b2d1 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexMain.java
@@ -26,9 +26,15 @@ import java.util.Base64;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.TreeMap;
import lombok.Getter;
import lombok.Setter;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
+import org.onap.policy.apex.model.basicmodel.service.ModelService;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
+import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
+import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
import org.onap.policy.apex.service.parameters.ApexParameterHandler;
import org.onap.policy.apex.service.parameters.ApexParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
@@ -128,6 +134,67 @@ public class ApexMain {
LOGGER.exit("Started Apex");
}
+ /**
+ * Updates the APEX Engine with the model created from new Policies.
+ *
+ * @param policyArgsMap the map with command line arguments as value and policy-id as key
+ * @throws ApexException on errors
+ */
+ public void updateModel(Map<ToscaPolicyIdentifier, String[]> policyArgsMap) throws ApexException {
+ apexParametersMap.clear();
+ AxContextAlbums albums = ModelService.getModel(AxContextAlbums.class);
+ Map<AxArtifactKey, AxContextAlbum> albumsMap = new TreeMap<>();
+ for (Entry<ToscaPolicyIdentifier, String[]> policyArgsEntry : policyArgsMap.entrySet()) {
+ findAlbumsToHold(albumsMap, policyArgsEntry.getKey());
+ try {
+ apexParametersMap.put(policyArgsEntry.getKey(), populateApexParameters(policyArgsEntry.getValue()));
+ } catch (ApexException e) {
+ LOGGER.error("Invalid arguments specified for policy - {}:{}", policyArgsEntry.getKey().getName(),
+ policyArgsEntry.getKey().getVersion(), e);
+ }
+ }
+ try {
+ if (albumsMap.isEmpty()) {
+ // clear context since none of the policies' context albums has to be retained
+ // this could be because all policies have a major version change,
+ // or a full set of new policies are received in the update message
+ activator.terminate();
+ // ParameterService is cleared when activator is terminated. Register the engine parameters in this case
+ new ApexParameterHandler().registerParameters(apexParametersMap.values().iterator().next());
+ activator = new ApexActivator(apexParametersMap);
+ activator.initialize();
+ setAlive(true);
+ } else {
+ albums.setAlbumsMap(albumsMap);
+ activator.setApexParametersMap(apexParametersMap);
+ activator.updateModel(apexParametersMap);
+ }
+ } catch (final ApexException e) {
+ LOGGER.error(APEX_SERVICE_FAILED_MSG, e);
+ activator.terminate();
+ throw new ApexException(e.getMessage());
+ }
+ }
+
+ /**
+ * Find the context albums which should be retained when updating the policies.
+ *
+ * @param albumsMap the albums which should be kept during model update
+ * @param policyId the policy id of current policy
+ */
+ private void findAlbumsToHold(Map<AxArtifactKey, AxContextAlbum> albumsMap, ToscaPolicyIdentifier policyId) {
+ for (Entry<ToscaPolicyIdentifier, AxPolicyModel> policyModelsEntry : activator.getPolicyModelsMap()
+ .entrySet()) {
+ // If a policy with the same major version is received in PDP_UPDATE,
+ // context for that policy has to be retained. For this, take such policies' albums
+ if (policyModelsEntry.getKey().getName().equals(policyId.getName())
+ && policyModelsEntry.getKey().getVersion().split("\\.")[0]
+ .equals(policyId.getVersion().split("\\.")[0])) {
+ albumsMap.putAll(policyModelsEntry.getValue().getAlbums().getAlbumsMap());
+ }
+ }
+ }
+
private ApexParameters populateApexParameters(String[] args) throws ApexException {
// Check the arguments
final ApexCommandLineArguments arguments = new ApexCommandLineArguments();
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java
index 8247fd5f3..f5e36e864 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineServiceImpl.java
@@ -77,8 +77,8 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
private AxArtifactKey engineServiceKey = null;
// The Apex engine workers this engine service is handling
- private final Map<AxArtifactKey, EngineService> engineWorkerMap = Collections
- .synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineService>());
+ private final Map<AxArtifactKey, EngineWorker> engineWorkerMap = Collections
+ .synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineWorker>());
// Event queue for events being sent into the Apex engines, it used by all engines within a
// group.
@@ -342,9 +342,17 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
}
// Update the engines
- for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
+ boolean isSubsequentInstance = false;
+ for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getId());
- engineWorkerEntry.getValue().updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
+ EngineWorker engineWorker = engineWorkerEntry.getValue();
+ if (isSubsequentInstance) {
+ // set subsequentInstance flag as true if the current engine worker instance is not the first one
+ // first engine instance will have this flag as false
+ engineWorker.setSubsequentInstance(true);
+ }
+ engineWorker.updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
+ isSubsequentInstance = true;
}
// start all engines on this engine service if it was not stopped before the update
@@ -355,7 +363,7 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
}
// Check if all engines are running
final StringBuilder notRunningEngineIdBuilder = new StringBuilder();
- for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
+ for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
if (engineWorkerEntry.getValue().getState() != AxEngineState.READY
&& engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) {
notRunningEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
@@ -387,7 +395,7 @@ public final class EngineServiceImpl implements EngineService, EngineServiceEven
}
// Check if all engines are stopped
final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
- for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
+ for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
notStoppedEngineIdBuilder.append('(');
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java
index a24d9618a..e00515bd0 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/runtime/impl/EngineWorker.java
@@ -6,15 +6,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -25,7 +25,6 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
@@ -33,7 +32,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
-
+import lombok.Setter;
import org.onap.policy.apex.context.ContextException;
import org.onap.policy.apex.context.ContextRuntimeException;
import org.onap.policy.apex.context.SchemaHelper;
@@ -98,6 +97,9 @@ final class EngineWorker implements EngineService {
// Converts ApexEvent instances to and from EnEvent instances
private ApexEvent2EnEventConverter apexEnEventConverter = null;
+ @Setter
+ private boolean isSubsequentInstance;
+
/**
* Constructor that creates an Apex engine, an event processor for events to be sent to that engine, and an
* {@link ApexModelReader} instance to read Apex models using JAXB.
@@ -236,9 +238,8 @@ final class EngineWorker implements EngineService {
}
}
}
-
// Update the Apex model in the Apex engine
- engine.updateModel(apexModel);
+ engine.updateModel(apexModel, isSubsequentInstance);
LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey);
LOGGER.exit();
@@ -613,7 +614,7 @@ final class EngineWorker implements EngineService {
/**
* Debug the event if debug is enabled.
- *
+ *
* @param event the event to debug
*/
private void debugEventIfDebugEnabled(ApexEvent event) {
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java
index c1ef50bd7..553ed18c6 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/parameters/ApexParameterHandler.java
@@ -130,10 +130,10 @@ public class ApexParameterHandler {
/**
* Register all the incoming parameters with the parameter service.
- *
+ *
* @param parameters The parameters to register
*/
- private void registerParameters(ApexParameters parameters) {
+ public void registerParameters(ApexParameters parameters) {
ParameterService.register(parameters);
ParameterService.register(parameters.getEngineServiceParameters());
ParameterService.register(parameters.getEngineServiceParameters().getEngineParameters());