aboutsummaryrefslogtreecommitdiffstats
path: root/services/services-engine
diff options
context:
space:
mode:
Diffstat (limited to 'services/services-engine')
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java81
1 files changed, 49 insertions, 32 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
index 205b865d5..fddbcb79f 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
@@ -106,7 +106,7 @@ public class ApexActivator {
ApexParameters apexParameters = apexParametersMap.values().iterator().next();
// totalInstanceCount is the sum of instance counts required as per each policy
int totalInstanceCount = apexParametersMap.values().stream()
- .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
+ .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount);
instantiateEngine(apexParameters);
setUpModelMarhsallerAndUnmarshaller(apexParameters);
@@ -126,12 +126,12 @@ public class ApexActivator {
for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) {
ApexParameters apexParams = apexParamsEntry.getValue();
boolean duplicateInputParameterExist =
- apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
+ apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
boolean duplicateOutputParameterExist =
- apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey);
+ apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey);
if (duplicateInputParameterExist || duplicateOutputParameterExist) {
LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.",
- apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion());
+ apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion());
apexParametersMap.remove(apexParamsEntry.getKey());
continue;
}
@@ -140,22 +140,29 @@ public class ApexActivator {
// Check if a policy model file has been specified
if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) {
LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .",
- apexParams.getEngineServiceParameters().getPolicyModelFileName());
+ apexParams.getEngineServiceParameters().getPolicyModelFileName());
- final String policyModelString = TextFileUtils
- .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
+ final String policyModelString =
+ TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
AxPolicyModel policyModel = EngineServiceImpl
- .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
+ .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
}
}
AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
+
// Set the policy model in the engine
apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel,
- true);
+ true);
+
setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
- outputParametersMap);
+ outputParametersMap);
+
+ // Wire up pairings between marhsallers and unmarshallers
setUpMarshalerPairings(inputParametersMap);
+
+ // Start event processing
+ startUnmarshallers(inputParametersMap);
}
private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) {
@@ -163,42 +170,43 @@ public class ApexActivator {
ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey());
AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue());
Stream<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelStream =
- policyModelsMap.entrySet().stream().skip(1);
+ policyModelsMap.entrySet().stream().skip(1);
Entry<ToscaPolicyIdentifier, AxPolicyModel> finalPolicyModelEntry =
- policyModelStream.reduce(firstEntry, ((entry1, entry2) -> {
- try {
- entry1.setValue(PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(),
- true, true));
- } catch (ApexModelException exc) {
- LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.",
- entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
- apexParametersMap.remove(entry2.getKey());
- policyModelsMap.remove(entry2.getKey());
- }
- return entry1;
- }));
+ policyModelStream.reduce(firstEntry, ((entry1, entry2) -> {
+ try {
+ entry1.setValue(
+ PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
+ } catch (ApexModelException exc) {
+ LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.",
+ entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
+ apexParametersMap.remove(entry2.getKey());
+ policyModelsMap.remove(entry2.getKey());
+ }
+ return entry1;
+ }));
AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue());
policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap
return finalPolicyModel;
}
private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
- Map<String, EventHandlerParameters> inputParametersMap,
- Map<String, EventHandlerParameters> outputParametersMap) throws ApexEventException {
+ Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap)
+ throws ApexEventException {
// Producer parameters specify what event marshalers to handle events leaving Apex are
// set up and how they are set up
for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) {
final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(),
- engineServiceParameters, outputParameters.getValue());
+ engineServiceParameters, outputParameters.getValue());
marshaller.init();
apexEngineService.registerActionListener(outputParameters.getKey(), marshaller);
marshallerMap.put(outputParameters.getKey(), marshaller);
}
+
// Consumer parameters specify what event unmarshalers to handle events coming into Apex
// are set up and how they are set up
for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(),
- engineServiceParameters, inputParameters.getValue());
+ engineServiceParameters, inputParameters.getValue());
unmarshallerMap.put(inputParameters.getKey(), unmarshaller);
unmarshaller.init(engineServiceHandler);
}
@@ -206,7 +214,7 @@ public class ApexActivator {
private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
if (null != apexEngineService
- && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) {
+ && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) {
throw new ApexException("Apex Engine already initialized.");
}
// Create engine with specified thread count
@@ -216,7 +224,7 @@ public class ApexActivator {
// Instantiate and start the messaging service for Deployment
LOGGER.debug("starting apex deployment service . . .");
final EngDepMessagingService engDepService = new EngDepMessagingService(apexEngineService,
- apexParameters.getEngineServiceParameters().getDeploymentPort());
+ apexParameters.getEngineServiceParameters().getDeploymentPort());
engDepService.start();
// Create the engine holder to hold the engine's references and act as an event receiver
@@ -240,14 +248,23 @@ public class ApexActivator {
if (inputParameters.getValue().isPeeredMode(peeredMode)) {
// Find the unmarshaler and marshaler
final ApexEventMarshaller peeredMarshaler =
- marshallerMap.get(inputParameters.getValue().getPeer(peeredMode));
+ marshallerMap.get(inputParameters.getValue().getPeer(peeredMode));
// Connect the unmarshaler and marshaler
unmarshaller.connectMarshaler(peeredMode, peeredMarshaler);
}
}
- // Now let's get events flowing
- unmarshaller.start();
+ }
+ }
+
+ /**
+ * Start up event processing, this happens once all marshaller to unmarshaller wiring has been done.
+ *
+ * @param inputParametersMap the apex parameters
+ */
+ private void startUnmarshallers(Map<String, EventHandlerParameters> inputParametersMap) {
+ for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
+ unmarshallerMap.get(inputParameters.getKey()).start();
}
}