diff options
Diffstat (limited to 'services/services-engine/src/main/java/org')
-rw-r--r-- | services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java | 81 |
1 files changed, 49 insertions, 32 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java index 205b865d5..fddbcb79f 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexActivator.java @@ -106,7 +106,7 @@ public class ApexActivator { ApexParameters apexParameters = apexParametersMap.values().iterator().next(); // totalInstanceCount is the sum of instance counts required as per each policy int totalInstanceCount = apexParametersMap.values().stream() - .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum(); + .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum(); apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount); instantiateEngine(apexParameters); setUpModelMarhsallerAndUnmarshaller(apexParameters); @@ -126,12 +126,12 @@ public class ApexActivator { for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) { ApexParameters apexParams = apexParamsEntry.getValue(); boolean duplicateInputParameterExist = - apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey); + apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey); boolean duplicateOutputParameterExist = - apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey); + apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey); if (duplicateInputParameterExist || duplicateOutputParameterExist) { LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.", - apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion()); + apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion()); apexParametersMap.remove(apexParamsEntry.getKey()); continue; } @@ -140,22 +140,29 @@ public class ApexActivator { // Check if a policy model file has been specified if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) { LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .", - apexParams.getEngineServiceParameters().getPolicyModelFileName()); + apexParams.getEngineServiceParameters().getPolicyModelFileName()); - final String policyModelString = TextFileUtils - .getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName()); + final String policyModelString = + TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName()); AxPolicyModel policyModel = EngineServiceImpl - .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString); + .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString); policyModelsMap.put(apexParamsEntry.getKey(), policyModel); } } AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap); + // Set the policy model in the engine apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel, - true); + true); + setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap, - outputParametersMap); + outputParametersMap); + + // Wire up pairings between marhsallers and unmarshallers setUpMarshalerPairings(inputParametersMap); + + // Start event processing + startUnmarshallers(inputParametersMap); } private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) { @@ -163,42 +170,43 @@ public class ApexActivator { ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey()); AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue()); Stream<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelStream = - policyModelsMap.entrySet().stream().skip(1); + policyModelsMap.entrySet().stream().skip(1); Entry<ToscaPolicyIdentifier, AxPolicyModel> finalPolicyModelEntry = - policyModelStream.reduce(firstEntry, ((entry1, entry2) -> { - try { - entry1.setValue(PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), - true, true)); - } catch (ApexModelException exc) { - LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.", - entry2.getKey().getName(), entry2.getKey().getVersion(), exc); - apexParametersMap.remove(entry2.getKey()); - policyModelsMap.remove(entry2.getKey()); - } - return entry1; - })); + policyModelStream.reduce(firstEntry, ((entry1, entry2) -> { + try { + entry1.setValue( + PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true)); + } catch (ApexModelException exc) { + LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.", + entry2.getKey().getName(), entry2.getKey().getVersion(), exc); + apexParametersMap.remove(entry2.getKey()); + policyModelsMap.remove(entry2.getKey()); + } + return entry1; + })); AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue()); policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap return finalPolicyModel; } private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters, - Map<String, EventHandlerParameters> inputParametersMap, - Map<String, EventHandlerParameters> outputParametersMap) throws ApexEventException { + Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap) + throws ApexEventException { // Producer parameters specify what event marshalers to handle events leaving Apex are // set up and how they are set up for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) { final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(), - engineServiceParameters, outputParameters.getValue()); + engineServiceParameters, outputParameters.getValue()); marshaller.init(); apexEngineService.registerActionListener(outputParameters.getKey(), marshaller); marshallerMap.put(outputParameters.getKey(), marshaller); } + // Consumer parameters specify what event unmarshalers to handle events coming into Apex // are set up and how they are set up for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) { final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(), - engineServiceParameters, inputParameters.getValue()); + engineServiceParameters, inputParameters.getValue()); unmarshallerMap.put(inputParameters.getKey(), unmarshaller); unmarshaller.init(engineServiceHandler); } @@ -206,7 +214,7 @@ public class ApexActivator { private void instantiateEngine(ApexParameters apexParameters) throws ApexException { if (null != apexEngineService - && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) { + && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) { throw new ApexException("Apex Engine already initialized."); } // Create engine with specified thread count @@ -216,7 +224,7 @@ public class ApexActivator { // Instantiate and start the messaging service for Deployment LOGGER.debug("starting apex deployment service . . ."); final EngDepMessagingService engDepService = new EngDepMessagingService(apexEngineService, - apexParameters.getEngineServiceParameters().getDeploymentPort()); + apexParameters.getEngineServiceParameters().getDeploymentPort()); engDepService.start(); // Create the engine holder to hold the engine's references and act as an event receiver @@ -240,14 +248,23 @@ public class ApexActivator { if (inputParameters.getValue().isPeeredMode(peeredMode)) { // Find the unmarshaler and marshaler final ApexEventMarshaller peeredMarshaler = - marshallerMap.get(inputParameters.getValue().getPeer(peeredMode)); + marshallerMap.get(inputParameters.getValue().getPeer(peeredMode)); // Connect the unmarshaler and marshaler unmarshaller.connectMarshaler(peeredMode, peeredMarshaler); } } - // Now let's get events flowing - unmarshaller.start(); + } + } + + /** + * Start up event processing, this happens once all marshaller to unmarshaller wiring has been done. + * + * @param inputParametersMap the apex parameters + */ + private void startUnmarshallers(Map<String, EventHandlerParameters> inputParametersMap) { + for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) { + unmarshallerMap.get(inputParameters.getKey()).start(); } } |