diff options
author | liamfallon <liam.fallon@est.tech> | 2020-04-04 15:35:07 +0100 |
---|---|---|
committer | liamfallon <liam.fallon@est.tech> | 2020-04-05 13:10:18 +0100 |
commit | e1085f59c1ecd68de574391dc490973abd72a731 (patch) | |
tree | 574cd253996b6aaec30a7b44b76fd776a70b2295 /services/services-engine/src/main/java | |
parent | 5e012ffd4e1c6b0d2ce174c74b939d37d5126f06 (diff) |
Fix intermittent unit test failures reseterquestor
When consumers and producers are paired as in the case of the REST
Rquestor, both sides must come up and be wired together in the
initiation phase of apex-pdp before the consumers and producers start
handling envents.
In the ApexActivator class, the consumers were started immediately after
they were initialized meaning that a consumer could return events to a
producer that had not started yet.
This change fixes the ApexActivator so that it waits until all consumers
and producers are initialized before starting event handling.
It also fixes the timings on RestRequestor tests and tidies up the unit
tests.
Issue-ID: POLICY-2469
Change-Id: Ib66d9531bf21f2a879ab33795aded4f48e7bfbc6
Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'services/services-engine/src/main/java')
-rw-r--r-- | services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java | 81 |
1 files changed, 49 insertions, 32 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java index 205b865d5..fddbcb79f 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java @@ -106,7 +106,7 @@ public class ApexActivator { ApexParameters apexParameters = apexParametersMap.values().iterator().next(); // totalInstanceCount is the sum of instance counts required as per each policy int totalInstanceCount = apexParametersMap.values().stream() - .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum(); + .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum(); apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount); instantiateEngine(apexParameters); setUpModelMarhsallerAndUnmarshaller(apexParameters); @@ -126,12 +126,12 @@ public class ApexActivator { for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) { ApexParameters apexParams = apexParamsEntry.getValue(); boolean duplicateInputParameterExist = - apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey); + apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey); boolean duplicateOutputParameterExist = - apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey); + apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey); if (duplicateInputParameterExist || duplicateOutputParameterExist) { LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.", - apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion()); + apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion()); apexParametersMap.remove(apexParamsEntry.getKey()); continue; } @@ -140,22 +140,29 @@ public class ApexActivator { // Check if a policy model file has been specified if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) { LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .", - apexParams.getEngineServiceParameters().getPolicyModelFileName()); + apexParams.getEngineServiceParameters().getPolicyModelFileName()); - final String policyModelString = TextFileUtils - .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName()); + final String policyModelString = + TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName()); AxPolicyModel policyModel = EngineServiceImpl - .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString); + .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString); policyModelsMap.put(apexParamsEntry.getKey(), policyModel); } } AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap); + // Set the policy model in the engine apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel, - true); + true); + setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap, - outputParametersMap); + outputParametersMap); + + // Wire up pairings between marhsallers and unmarshallers setUpMarshalerPairings(inputParametersMap); + + // Start event processing + startUnmarshallers(inputParametersMap); } private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) { @@ -163,42 +170,43 @@ public class ApexActivator { ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey()); AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue()); Stream<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelStream = - policyModelsMap.entrySet().stream().skip(1); + policyModelsMap.entrySet().stream().skip(1); Entry<ToscaPolicyIdentifier, AxPolicyModel> finalPolicyModelEntry = - policyModelStream.reduce(firstEntry, ((entry1, entry2) -> { - try { - entry1.setValue(PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), - true, true)); - } catch (ApexModelException exc) { - LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.", - entry2.getKey().getName(), entry2.getKey().getVersion(), exc); - apexParametersMap.remove(entry2.getKey()); - policyModelsMap.remove(entry2.getKey()); - } - return entry1; - })); + policyModelStream.reduce(firstEntry, ((entry1, entry2) -> { + try { + entry1.setValue( + PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true)); + } catch (ApexModelException exc) { + LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.", + entry2.getKey().getName(), entry2.getKey().getVersion(), exc); + apexParametersMap.remove(entry2.getKey()); + policyModelsMap.remove(entry2.getKey()); + } + return entry1; + })); AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue()); policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap return finalPolicyModel; } private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters, - Map<String, EventHandlerParameters> inputParametersMap, - Map<String, EventHandlerParameters> outputParametersMap) throws ApexEventException { + Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap) + throws ApexEventException { // Producer parameters specify what event marshalers to handle events leaving Apex are // set up and how they are set up for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) { final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(), - engineServiceParameters, outputParameters.getValue()); + engineServiceParameters, outputParameters.getValue()); marshaller.init(); apexEngineService.registerActionListener(outputParameters.getKey(), marshaller); marshallerMap.put(outputParameters.getKey(), marshaller); } + // Consumer parameters specify what event unmarshalers to handle events coming into Apex // are set up and how they are set up for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) { final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(), - engineServiceParameters, inputParameters.getValue()); + engineServiceParameters, inputParameters.getValue()); unmarshallerMap.put(inputParameters.getKey(), unmarshaller); unmarshaller.init(engineServiceHandler); } @@ -206,7 +214,7 @@ public class ApexActivator { private void instantiateEngine(ApexParameters apexParameters) throws ApexException { if (null != apexEngineService - && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) { + && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) { throw new ApexException("Apex Engine already initialized."); } // Create engine with specified thread count @@ -216,7 +224,7 @@ public class ApexActivator { // Instantiate and start the messaging service for Deployment LOGGER.debug("starting apex deployment service . . ."); final EngDepMessagingService engDepService = new EngDepMessagingService(apexEngineService, - apexParameters.getEngineServiceParameters().getDeploymentPort()); + apexParameters.getEngineServiceParameters().getDeploymentPort()); engDepService.start(); // Create the engine holder to hold the engine's references and act as an event receiver @@ -240,14 +248,23 @@ public class ApexActivator { if (inputParameters.getValue().isPeeredMode(peeredMode)) { // Find the unmarshaler and marshaler final ApexEventMarshaller peeredMarshaler = - marshallerMap.get(inputParameters.getValue().getPeer(peeredMode)); + marshallerMap.get(inputParameters.getValue().getPeer(peeredMode)); // Connect the unmarshaler and marshaler unmarshaller.connectMarshaler(peeredMode, peeredMarshaler); } } - // Now let's get events flowing - unmarshaller.start(); + } + } + + /** + * Start up event processing, this happens once all marshaller to unmarshaller wiring has been done. + * + * @param inputParametersMap the apex parameters + */ + private void startUnmarshallers(Map<String, EventHandlerParameters> inputParametersMap) { + for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) { + unmarshallerMap.get(inputParameters.getKey()).start(); } } |