From e1085f59c1ecd68de574391dc490973abd72a731 Mon Sep 17 00:00:00 2001 From: liamfallon Date: Sat, 4 Apr 2020 15:35:07 +0100 Subject: Fix intermittent unit test failures reseterquestor When consumers and producers are paired as in the case of the REST Rquestor, both sides must come up and be wired together in the initiation phase of apex-pdp before the consumers and producers start handling envents. In the ApexActivator class, the consumers were started immediately after they were initialized meaning that a consumer could return events to a producer that had not started yet. This change fixes the ApexActivator so that it waits until all consumers and producers are initialized before starting event handling. It also fixes the timings on RestRequestor tests and tidies up the unit tests. Issue-ID: POLICY-2469 Change-Id: Ib66d9531bf21f2a879ab33795aded4f48e7bfbc6 Signed-off-by: liamfallon --- .../apex/service/engine/main/ApexActivator.java | 81 +++++++++++++--------- 1 file changed, 49 insertions(+), 32 deletions(-) (limited to 'services/services-engine/src/main/java/org') diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java index 205b865d5..fddbcb79f 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java @@ -106,7 +106,7 @@ public class ApexActivator { ApexParameters apexParameters = apexParametersMap.values().iterator().next(); // totalInstanceCount is the sum of instance counts required as per each policy int totalInstanceCount = apexParametersMap.values().stream() - .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum(); + .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum(); apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount); instantiateEngine(apexParameters); setUpModelMarhsallerAndUnmarshaller(apexParameters); @@ -126,12 +126,12 @@ public class ApexActivator { for (Entry apexParamsEntry : apexParametersMap.entrySet()) { ApexParameters apexParams = apexParamsEntry.getValue(); boolean duplicateInputParameterExist = - apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey); + apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey); boolean duplicateOutputParameterExist = - apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey); + apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey); if (duplicateInputParameterExist || duplicateOutputParameterExist) { LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.", - apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion()); + apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion()); apexParametersMap.remove(apexParamsEntry.getKey()); continue; } @@ -140,22 +140,29 @@ public class ApexActivator { // Check if a policy model file has been specified if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) { LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .", - apexParams.getEngineServiceParameters().getPolicyModelFileName()); + apexParams.getEngineServiceParameters().getPolicyModelFileName()); - final String policyModelString = TextFileUtils - .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName()); + final String policyModelString = + TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName()); AxPolicyModel policyModel = EngineServiceImpl - .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString); + .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString); policyModelsMap.put(apexParamsEntry.getKey(), policyModel); } } AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap); + // Set the policy model in the engine apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel, - true); + true); + setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap, - outputParametersMap); + outputParametersMap); + + // Wire up pairings between marhsallers and unmarshallers setUpMarshalerPairings(inputParametersMap); + + // Start event processing + startUnmarshallers(inputParametersMap); } private AxPolicyModel aggregatePolicyModels(Map policyModelsMap) { @@ -163,42 +170,43 @@ public class ApexActivator { ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey()); AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue()); Stream> policyModelStream = - policyModelsMap.entrySet().stream().skip(1); + policyModelsMap.entrySet().stream().skip(1); Entry finalPolicyModelEntry = - policyModelStream.reduce(firstEntry, ((entry1, entry2) -> { - try { - entry1.setValue(PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), - true, true)); - } catch (ApexModelException exc) { - LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.", - entry2.getKey().getName(), entry2.getKey().getVersion(), exc); - apexParametersMap.remove(entry2.getKey()); - policyModelsMap.remove(entry2.getKey()); - } - return entry1; - })); + policyModelStream.reduce(firstEntry, ((entry1, entry2) -> { + try { + entry1.setValue( + PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true)); + } catch (ApexModelException exc) { + LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.", + entry2.getKey().getName(), entry2.getKey().getVersion(), exc); + apexParametersMap.remove(entry2.getKey()); + policyModelsMap.remove(entry2.getKey()); + } + return entry1; + })); AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue()); policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap return finalPolicyModel; } private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters, - Map inputParametersMap, - Map outputParametersMap) throws ApexEventException { + Map inputParametersMap, Map outputParametersMap) + throws ApexEventException { // Producer parameters specify what event marshalers to handle events leaving Apex are // set up and how they are set up for (Entry outputParameters : outputParametersMap.entrySet()) { final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(), - engineServiceParameters, outputParameters.getValue()); + engineServiceParameters, outputParameters.getValue()); marshaller.init(); apexEngineService.registerActionListener(outputParameters.getKey(), marshaller); marshallerMap.put(outputParameters.getKey(), marshaller); } + // Consumer parameters specify what event unmarshalers to handle events coming into Apex // are set up and how they are set up for (final Entry inputParameters : inputParametersMap.entrySet()) { final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(), - engineServiceParameters, inputParameters.getValue()); + engineServiceParameters, inputParameters.getValue()); unmarshallerMap.put(inputParameters.getKey(), unmarshaller); unmarshaller.init(engineServiceHandler); } @@ -206,7 +214,7 @@ public class ApexActivator { private void instantiateEngine(ApexParameters apexParameters) throws ApexException { if (null != apexEngineService - && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) { + && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) { throw new ApexException("Apex Engine already initialized."); } // Create engine with specified thread count @@ -216,7 +224,7 @@ public class ApexActivator { // Instantiate and start the messaging service for Deployment LOGGER.debug("starting apex deployment service . . ."); final EngDepMessagingService engDepService = new EngDepMessagingService(apexEngineService, - apexParameters.getEngineServiceParameters().getDeploymentPort()); + apexParameters.getEngineServiceParameters().getDeploymentPort()); engDepService.start(); // Create the engine holder to hold the engine's references and act as an event receiver @@ -240,14 +248,23 @@ public class ApexActivator { if (inputParameters.getValue().isPeeredMode(peeredMode)) { // Find the unmarshaler and marshaler final ApexEventMarshaller peeredMarshaler = - marshallerMap.get(inputParameters.getValue().getPeer(peeredMode)); + marshallerMap.get(inputParameters.getValue().getPeer(peeredMode)); // Connect the unmarshaler and marshaler unmarshaller.connectMarshaler(peeredMode, peeredMarshaler); } } - // Now let's get events flowing - unmarshaller.start(); + } + } + + /** + * Start up event processing, this happens once all marshaller to unmarshaller wiring has been done. + * + * @param inputParametersMap the apex parameters + */ + private void startUnmarshallers(Map inputParametersMap) { + for (final Entry inputParameters : inputParametersMap.entrySet()) { + unmarshallerMap.get(inputParameters.getKey()).start(); } } -- cgit 1.2.3-korg