summaryrefslogtreecommitdiffstats
path: root/services/services-engine
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@est.tech>2020-04-04 15:35:07 +0100
committerliamfallon <liam.fallon@est.tech>2020-04-05 13:10:18 +0100
commite1085f59c1ecd68de574391dc490973abd72a731 (patch)
tree574cd253996b6aaec30a7b44b76fd776a70b2295 /services/services-engine
parent5e012ffd4e1c6b0d2ce174c74b939d37d5126f06 (diff)
Fix intermittent unit test failures reseterquestor
When consumers and producers are paired as in the case of the REST Rquestor, both sides must come up and be wired together in the initiation phase of apex-pdp before the consumers and producers start handling envents. In the ApexActivator class, the consumers were started immediately after they were initialized meaning that a consumer could return events to a producer that had not started yet. This change fixes the ApexActivator so that it waits until all consumers and producers are initialized before starting event handling. It also fixes the timings on RestRequestor tests and tidies up the unit tests. Issue-ID: POLICY-2469 Change-Id: Ib66d9531bf21f2a879ab33795aded4f48e7bfbc6 Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'services/services-engine')
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java81
1 files changed, 49 insertions, 32 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
index 205b865d5..fddbcb79f 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java
@@ -106,7 +106,7 @@ public class ApexActivator {
ApexParameters apexParameters = apexParametersMap.values().iterator().next();
// totalInstanceCount is the sum of instance counts required as per each policy
int totalInstanceCount = apexParametersMap.values().stream()
- .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
+ .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount);
instantiateEngine(apexParameters);
setUpModelMarhsallerAndUnmarshaller(apexParameters);
@@ -126,12 +126,12 @@ public class ApexActivator {
for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) {
ApexParameters apexParams = apexParamsEntry.getValue();
boolean duplicateInputParameterExist =
- apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
+ apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
boolean duplicateOutputParameterExist =
- apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey);
+ apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey);
if (duplicateInputParameterExist || duplicateOutputParameterExist) {
LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.",
- apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion());
+ apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion());
apexParametersMap.remove(apexParamsEntry.getKey());
continue;
}
@@ -140,22 +140,29 @@ public class ApexActivator {
// Check if a policy model file has been specified
if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) {
LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .",
- apexParams.getEngineServiceParameters().getPolicyModelFileName());
+ apexParams.getEngineServiceParameters().getPolicyModelFileName());
- final String policyModelString = TextFileUtils
- .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
+ final String policyModelString =
+ TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
AxPolicyModel policyModel = EngineServiceImpl
- .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
+ .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
}
}
AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
+
// Set the policy model in the engine
apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel,
- true);
+ true);
+
setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
- outputParametersMap);
+ outputParametersMap);
+
+ // Wire up pairings between marhsallers and unmarshallers
setUpMarshalerPairings(inputParametersMap);
+
+ // Start event processing
+ startUnmarshallers(inputParametersMap);
}
private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) {
@@ -163,42 +170,43 @@ public class ApexActivator {
ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey());
AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue());
Stream<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelStream =
- policyModelsMap.entrySet().stream().skip(1);
+ policyModelsMap.entrySet().stream().skip(1);
Entry<ToscaPolicyIdentifier, AxPolicyModel> finalPolicyModelEntry =
- policyModelStream.reduce(firstEntry, ((entry1, entry2) -> {
- try {
- entry1.setValue(PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(),
- true, true));
- } catch (ApexModelException exc) {
- LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.",
- entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
- apexParametersMap.remove(entry2.getKey());
- policyModelsMap.remove(entry2.getKey());
- }
- return entry1;
- }));
+ policyModelStream.reduce(firstEntry, ((entry1, entry2) -> {
+ try {
+ entry1.setValue(
+ PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
+ } catch (ApexModelException exc) {
+ LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.",
+ entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
+ apexParametersMap.remove(entry2.getKey());
+ policyModelsMap.remove(entry2.getKey());
+ }
+ return entry1;
+ }));
AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue());
policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap
return finalPolicyModel;
}
private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
- Map<String, EventHandlerParameters> inputParametersMap,
- Map<String, EventHandlerParameters> outputParametersMap) throws ApexEventException {
+ Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap)
+ throws ApexEventException {
// Producer parameters specify what event marshalers to handle events leaving Apex are
// set up and how they are set up
for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) {
final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(),
- engineServiceParameters, outputParameters.getValue());
+ engineServiceParameters, outputParameters.getValue());
marshaller.init();
apexEngineService.registerActionListener(outputParameters.getKey(), marshaller);
marshallerMap.put(outputParameters.getKey(), marshaller);
}
+
// Consumer parameters specify what event unmarshalers to handle events coming into Apex
// are set up and how they are set up
for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(),
- engineServiceParameters, inputParameters.getValue());
+ engineServiceParameters, inputParameters.getValue());
unmarshallerMap.put(inputParameters.getKey(), unmarshaller);
unmarshaller.init(engineServiceHandler);
}
@@ -206,7 +214,7 @@ public class ApexActivator {
private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
if (null != apexEngineService
- && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) {
+ && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) {
throw new ApexException("Apex Engine already initialized.");
}
// Create engine with specified thread count
@@ -216,7 +224,7 @@ public class ApexActivator {
// Instantiate and start the messaging service for Deployment
LOGGER.debug("starting apex deployment service . . .");
final EngDepMessagingService engDepService = new EngDepMessagingService(apexEngineService,
- apexParameters.getEngineServiceParameters().getDeploymentPort());
+ apexParameters.getEngineServiceParameters().getDeploymentPort());
engDepService.start();
// Create the engine holder to hold the engine's references and act as an event receiver
@@ -240,14 +248,23 @@ public class ApexActivator {
if (inputParameters.getValue().isPeeredMode(peeredMode)) {
// Find the unmarshaler and marshaler
final ApexEventMarshaller peeredMarshaler =
- marshallerMap.get(inputParameters.getValue().getPeer(peeredMode));
+ marshallerMap.get(inputParameters.getValue().getPeer(peeredMode));
// Connect the unmarshaler and marshaler
unmarshaller.connectMarshaler(peeredMode, peeredMarshaler);
}
}
- // Now let's get events flowing
- unmarshaller.start();
+ }
+ }
+
+ /**
+ * Start up event processing, this happens once all marshaller to unmarshaller wiring has been done.
+ *
+ * @param inputParametersMap the apex parameters
+ */
+ private void startUnmarshallers(Map<String, EventHandlerParameters> inputParametersMap) {
+ for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
+ unmarshallerMap.get(inputParameters.getKey()).start();
}
}