diff options
author | a.sreekumar <ajith.sreekumar@bell.ca> | 2021-06-22 15:55:46 +0100 |
---|---|---|
committer | a.sreekumar <ajith.sreekumar@bell.ca> | 2021-06-28 10:36:48 +0100 |
commit | 926646d8e5e86e680a119360f93d7bdb46c89435 (patch) | |
tree | 7ff431e59672b19f658faed87c80b4d147253c9c /core/core-engine/src | |
parent | 63637d939697451d3ac63216d938780a157ff895 (diff) |
Changes to support multiple outputs from a state
This review addresses two main changes:
1) inputFields and outputFields are not tied to task definition anymore.
Instead inputEvent and outputEvents associated to a task is populated
as part of the policy state definition, as the state definition have
the information anyway.
- Clean up of the usage of inputFields and outputFields in task
definition will happen in a future review
- inputFields and outputFields defined in task definition in
policies until honolulu will not make the policy invalid as the
changes are done in backward compatible way.
2) Multiple output events can come out of a final state now.
- Define another policy state output with the relevant eventName in
the command file
- In the task logic, create a map to store the fields of the relevant
outputEvent, and then just call
"executor.addFieldsToOutput(<the_map_of_fields>)"
These 2 steps are enough to send multiple events to relevant
components as per the apex configuration.
Change-Id: Id88ca402704106404f529e595e1a76f6bf167876
Issue-ID: POLICY-3336
Signed-off-by: a.sreekumar <ajith.sreekumar@bell.ca>
Diffstat (limited to 'core/core-engine/src')
14 files changed, 434 insertions, 227 deletions
diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java index fd5fe131f..878d5277a 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019-2020 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,9 +24,13 @@ package org.onap.policy.apex.core.engine.engine.impl; import static org.onap.policy.common.utils.validation.Assertions.argumentNotNull; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import org.onap.policy.apex.context.ContextAlbum; import org.onap.policy.apex.context.ContextException; import org.onap.policy.apex.core.engine.context.ApexInternalContext; @@ -41,7 +46,12 @@ import org.onap.policy.apex.model.basicmodel.concepts.AxReferenceKey; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineStats; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; +import org.onap.policy.apex.model.policymodel.concepts.AxState; +import org.onap.policy.apex.model.policymodel.concepts.AxStateOutput; +import org.onap.policy.apex.model.policymodel.concepts.AxStateTaskReference; +import org.onap.policy.apex.model.policymodel.concepts.AxTask; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; @@ -122,6 +132,8 @@ public class ApexEngineImpl implements ApexEngine { } } + populateIoEventsToTask(apexModel); + // Create new internal context or update the existing one try { if (internalContext == null) { @@ -149,6 +161,48 @@ public class ApexEngineImpl implements ApexEngine { LOGGER.exit(UPDATE_MODEL + key.getId()); } + + private void populateIoEventsToTask(AxPolicyModel apexPolicyModel) { + Set<AxArtifactKey> updatedTasks = new TreeSet<>(); + for (var axPolicy : apexPolicyModel.getPolicies().getPolicyMap().values()) { + for (var axState : axPolicy.getStateMap().values()) { + AxEvent triggerEvent = apexPolicyModel.getEvents().get(axState.getTrigger()); + axState.getTaskReferences().forEach((taskKey, taskRef) -> { + AxTask task = apexPolicyModel.getTasks().getTaskMap().get(taskKey); + task.setInputEvent(triggerEvent); + updateTaskBasedOnStateOutput(apexPolicyModel, updatedTasks, axState, taskKey, taskRef, task); + updatedTasks.add(taskKey); + }); + } + } + } + + private void updateTaskBasedOnStateOutput(AxPolicyModel apexPolicyModel, Set<AxArtifactKey> updatedTasks, + AxState state, AxArtifactKey taskKey, AxStateTaskReference taskRef, AxTask task) { + Map<String, AxEvent> outputEvents = new TreeMap<>(); + AxStateOutput stateOutput = state.getStateOutputs().get(taskRef.getOutput().getLocalName()); + if (null == stateOutput.getOutgoingEventSet() || stateOutput.getOutgoingEventSet().isEmpty()) { + Set<AxArtifactKey> outEventSet = new TreeSet<>(); + outEventSet.add(stateOutput.getOutgoingEvent()); + stateOutput.setOutgoingEventSet(outEventSet); + } + if (state.getNextStateSet().isEmpty() + || state.getNextStateSet().contains(AxReferenceKey.getNullKey().getLocalName())) { + stateOutput.getOutgoingEventSet().forEach(outgoingEventKey -> outputEvents.put(outgoingEventKey.getName(), + apexPolicyModel.getEvents().get(outgoingEventKey))); + } else { + AxArtifactKey outgoingEventKey = stateOutput.getOutgoingEvent(); + outputEvents.put(outgoingEventKey.getName(), apexPolicyModel.getEvents().get(outgoingEventKey)); + } + if (updatedTasks.contains(taskKey)) { + // this happens only when same task is used by multiple policies + // with different eventName but same fields + task.getOutputEvents().putAll(outputEvents); + } else { + task.setOutputEvents(outputEvents); + } + } + /** * {@inheritDoc}. */ @@ -288,7 +342,7 @@ public class ApexEngineImpl implements ApexEngine { */ @Override public boolean handleEvent(final EnEvent incomingEvent) { - boolean ret = false; + var ret = false; if (incomingEvent == null) { LOGGER.warn("handleEvent()<-{},{}, cannot run engine, incoming event is null", key.getId(), state); return ret; @@ -307,17 +361,17 @@ public class ApexEngineImpl implements ApexEngine { LOGGER.debug(message); // By default we return a null event on errors - EnEvent outgoingEvent = null; + Collection<EnEvent> outgoingEvents = null; try { engineStats.executionEnter(incomingEvent.getKey()); - outgoingEvent = stateMachineHandler.execute(incomingEvent); + outgoingEvents = stateMachineHandler.execute(incomingEvent); engineStats.executionExit(); ret = true; } catch (final StateMachineException e) { LOGGER.warn("handleEvent()<-{},{}, engine execution error: ", key.getId(), state, e); // Create an exception return event - outgoingEvent = createExceptionEvent(incomingEvent, e); + outgoingEvents = createExceptionEvent(incomingEvent, e); } // Publish the outgoing event @@ -325,10 +379,12 @@ public class ApexEngineImpl implements ApexEngine { synchronized (eventListeners) { if (eventListeners.isEmpty()) { LOGGER.debug("handleEvent()<-{},{}, There is no listener registered to recieve outgoing event: {}", - key.getId(), state, outgoingEvent); + key.getId(), state, outgoingEvents); } for (final EnEventListener axEventListener : eventListeners.values()) { - axEventListener.onEnEvent(outgoingEvent); + for (var outgoingEvent : outgoingEvents) { + axEventListener.onEnEvent(outgoingEvent); + } } } } catch (final ApexException e) { @@ -398,7 +454,7 @@ public class ApexEngineImpl implements ApexEngine { */ @Override public AxEngineModel getEngineStatus() { - final AxEngineModel engineModel = new AxEngineModel(key); + final var engineModel = new AxEngineModel(key); engineModel.setTimestamp(System.currentTimeMillis()); engineModel.setState(state); engineModel.setStats(engineStats); @@ -440,14 +496,14 @@ public class ApexEngineImpl implements ApexEngine { * @param eventException The exception that was thrown * @return the exception event */ - private EnEvent createExceptionEvent(final EnEvent incomingEvent, final Exception eventException) { + private Set<EnEvent> createExceptionEvent(final EnEvent incomingEvent, final Exception eventException) { // The exception event is a clone of the incoming event with the exception suffix added to // its name and an extra // field "ExceptionMessage" added final EnEvent exceptionEvent = (EnEvent) incomingEvent.clone(); // Create the cascaded message string - final StringBuilder exceptionMessageStringBuilder = new StringBuilder(); + final var exceptionMessageStringBuilder = new StringBuilder(); exceptionMessageStringBuilder.append(eventException.getMessage()); Throwable subException = eventException.getCause(); @@ -460,6 +516,6 @@ public class ApexEngineImpl implements ApexEngine { // Set the exception message on the event exceptionEvent.setExceptionMessage(exceptionMessageStringBuilder.toString()); - return exceptionEvent; + return Set.of(exceptionEvent); } } diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/StateMachineHandler.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/StateMachineHandler.java index 32f86e465..c173d1f09 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/StateMachineHandler.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/StateMachineHandler.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +21,9 @@ package org.onap.policy.apex.core.engine.engine.impl; +import java.util.Collection; import java.util.HashMap; +import java.util.Set; import org.onap.policy.apex.core.engine.context.ApexInternalContext; import org.onap.policy.apex.core.engine.event.EnEvent; import org.onap.policy.apex.core.engine.executor.ExecutorFactory; @@ -79,8 +82,7 @@ public class StateMachineHandler { // Iterate over the policies in the policy model and create a state machine for each one for (final AxPolicy policy : ModelService.getModel(AxPolicies.class).getPolicyMap().values()) { // Create a state machine for this policy - final StateMachineExecutor thisStateMachineExecutor = - new StateMachineExecutor(executorFactory, policy.getKey()); + final var thisStateMachineExecutor = new StateMachineExecutor(executorFactory, policy.getKey()); // This executor is the top executor so has no parent thisStateMachineExecutor.setContext(null, policy, internalContext); @@ -90,8 +92,7 @@ public class StateMachineHandler { .get(policy.getStateMap().get(policy.getFirstState()).getTrigger()); // Put the state machine executor on the map for this trigger - final StateMachineExecutor lastStateMachineExecutor = - stateMachineExecutorMap.put(triggerEvent, thisStateMachineExecutor); + final var lastStateMachineExecutor = stateMachineExecutorMap.put(triggerEvent, thisStateMachineExecutor); if (lastStateMachineExecutor != null && lastStateMachineExecutor.getSubject() != thisStateMachineExecutor.getSubject()) { LOGGER.error("No more than one policy in a model can have the same trigger event. In model " @@ -138,32 +139,32 @@ public class StateMachineHandler { * @return The result of the state machine execution run * @throws StateMachineException On execution errors in a state machine */ - protected EnEvent execute(final EnEvent event) throws StateMachineException { + protected Collection<EnEvent> execute(final EnEvent event) throws StateMachineException { LOGGER.entry("execute()->" + event.getName()); // Try to execute the state machine for the trigger - final StateMachineExecutor stateMachineExecutor = stateMachineExecutorMap.get(event.getAxEvent()); + final var stateMachineExecutor = stateMachineExecutorMap.get(event.getAxEvent()); if (stateMachineExecutor == null) { final String exceptionMessage = - "state machine execution not possible, policy not found for trigger event " + event.getName(); + "state machine execution not possible, policy not found for trigger event " + event.getName(); LOGGER.warn(exceptionMessage); event.setExceptionMessage(exceptionMessage); - return event; + return Set.of(event); } // Run the state machine try { LOGGER.debug("execute(): state machine \"{}\" execution starting . . .", stateMachineExecutor); - final EnEvent outputObject = - stateMachineExecutor.execute(event.getExecutionId(), event.getExecutionProperties(), event); + final Collection<EnEvent> outputEvents = + stateMachineExecutor.execute(event.getExecutionId(), event.getExecutionProperties(), event); LOGGER.debug("execute()<-: state machine \"{}\" execution completed", stateMachineExecutor); - return outputObject; + return outputEvents; } catch (final Exception e) { LOGGER.warn("execute()<-: state machine \"" + stateMachineExecutor + "\" execution failed", e); throw new StateMachineException("execute()<-: execution failed on state machine " + stateMachineExecutor, - e); + e); } } diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateExecutor.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateExecutor.java index b04c354e5..11d0aa1c9 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateExecutor.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateExecutor.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019-2020 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -196,7 +197,7 @@ public class StateExecutor implements Executor<EnEvent, StateOutput, AxState, Ap // Execute the task final TreeMap<String, Object> incomingValues = new TreeMap<>(); incomingValues.putAll(incomingEvent); - final Map<String, Object> taskExecutionResultMap = + final Map<String, Map<String, Object>> taskExecutionResultMap = taskExecutorMap.get(taskKey).execute(executionId, executionProperties, incomingValues); final AxTask task = taskExecutorMap.get(taskKey).getSubject(); @@ -215,8 +216,9 @@ public class StateExecutor implements Executor<EnEvent, StateOutput, AxState, Ap // Execute the state finalizer logic to select a state output and to adjust the // taskExecutionResultMap - stateOutputName = - finalizerLogicExecutor.execute(executionId, executionProperties, taskExecutionResultMap); + // Multiple event outputs are possible only from final state, otherwise there will be only 1 outputevent + stateOutputName = finalizerLogicExecutor.execute(executionId, executionProperties, + taskExecutionResultMap.values().iterator().next()); } // Now look up the the actual state output @@ -227,19 +229,19 @@ public class StateExecutor implements Executor<EnEvent, StateOutput, AxState, Ap } // Create the state output and transfer all the fields across to its event - final StateOutput stateOutput = new StateOutput(stateOutputDefinition); + final var stateOutput = new StateOutput(stateOutputDefinition); this.lastStateOutput = stateOutput; - stateOutput.setEventFields(task.getRawOutputFields(), taskExecutionResultMap); + stateOutput.setEventFields(task.getOutputEvents(), taskExecutionResultMap); // Copy across fields from the incoming event that are not set on the outgoing event stateOutput.copyUnsetFields(incomingEvent); // Set the ExecutionID for the outgoing event to the value in the incoming event. - if (stateOutput.getOutputEvent() != null) { - stateOutput.getOutputEvent().setExecutionId(incomingEvent.getExecutionId()); - stateOutput.getOutputEvent().setExecutionProperties(incomingEvent.getExecutionProperties()); - } + stateOutput.getOutputEvents().values().forEach(outputEvent -> { + outputEvent.setExecutionId(incomingEvent.getExecutionId()); + outputEvent.setExecutionProperties(incomingEvent.getExecutionProperties()); + }); // That's it, the state execution is complete return stateOutput; diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateMachineExecutor.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateMachineExecutor.java index 67a3e6d79..52429a215 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateMachineExecutor.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateMachineExecutor.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +22,8 @@ package org.onap.policy.apex.core.engine.executor; +import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.TreeMap; @@ -41,7 +44,7 @@ import org.onap.policy.apex.model.policymodel.concepts.AxStateOutput; * @author Sven van der Meer (sven.van.der.meer@ericsson.com) * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy, ApexInternalContext> { +public class StateMachineExecutor implements Executor<EnEvent, Collection<EnEvent>, AxPolicy, ApexInternalContext> { // The Apex Policy and context for this state machine private AxPolicy axPolicy = null; private Executor<?, ?, ?, ?> parent = null; @@ -54,7 +57,7 @@ public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy private StateExecutor firstExecutor = null; // The next state machine executor - private Executor<EnEvent, EnEvent, AxPolicy, ApexInternalContext> nextExecutor = null; + private Executor<EnEvent, Collection<EnEvent>, AxPolicy, ApexInternalContext> nextExecutor = null; // The executor factory private ExecutorFactory executorFactory = null; @@ -120,8 +123,8 @@ public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy * {@inheritDoc}. */ @Override - public EnEvent execute(final long executionId, final Properties executionProperties, final EnEvent incomingEvent) - throws StateMachineException, ContextException { + public Collection<EnEvent> execute(final long executionId, final Properties executionProperties, + final EnEvent incomingEvent) throws StateMachineException, ContextException { // Check if there are any states on the state machine if (stateExecutorMap.size() == 0) { throw new StateMachineException("no states defined on state machine"); @@ -139,8 +142,10 @@ public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy incomingEvent.getKey(), firstExecutor.getSubject().getKey()), incomingEvent); while (true) { - // Execute the state, it returns an output or throws an exception - stateOutput = stateExecutor.execute(executionId, executionProperties, stateOutput.getOutputEvent()); + // OutputEventSet in a stateoutput can contain multiple events only when it is of the final state + // otherwise, there can be only 1 item in outputEventSet + stateOutput = stateExecutor.execute(executionId, executionProperties, + stateOutput.getOutputEvents().values().iterator().next()); // Use the next state of the state output to find if all the states have executed if (stateOutput.getNextState().equals(AxReferenceKey.getNullKey())) { @@ -155,7 +160,7 @@ public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy } } - return stateOutput.getOutputEvent(); + return stateOutput.getOutputEvents().values(); } /** @@ -229,15 +234,16 @@ public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy * {@inheritDoc}. */ @Override - public final EnEvent getOutgoing() { - return null; + public final Collection<EnEvent> getOutgoing() { + return Collections.emptyList(); } /** * {@inheritDoc}. */ @Override - public final void setNext(final Executor<EnEvent, EnEvent, AxPolicy, ApexInternalContext> newNextExecutor) { + public final void setNext( + final Executor<EnEvent, Collection<EnEvent>, AxPolicy, ApexInternalContext> newNextExecutor) { this.nextExecutor = newNextExecutor; } @@ -245,7 +251,7 @@ public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy * {@inheritDoc}. */ @Override - public final Executor<EnEvent, EnEvent, AxPolicy, ApexInternalContext> getNext() { + public final Executor<EnEvent, Collection<EnEvent>, AxPolicy, ApexInternalContext> getNext() { return nextExecutor; } diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java index 9c3c6f893..535565415 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,8 +24,15 @@ package org.onap.policy.apex.core.engine.executor; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; +import lombok.Getter; import org.onap.policy.apex.core.engine.event.EnEvent; import org.onap.policy.apex.core.engine.executor.exception.StateMachineException; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; import org.onap.policy.apex.model.basicmodel.concepts.AxReferenceKey; import org.onap.policy.apex.model.basicmodel.service.ModelService; import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; @@ -38,11 +46,12 @@ import org.onap.policy.common.utils.validation.Assertions; * * @author Liam Fallon (liam.fallon@ericsson.com) */ +@Getter public class StateOutput { // The state output has a state and an event private final AxStateOutput stateOutputDefinition; - private final AxEvent outputEventDef; - private final EnEvent outputEvent; + private AxEvent outputEventDef; + private final Map<AxArtifactKey, EnEvent> outputEvents; /** * Create a new state output from a state output definition. @@ -50,7 +59,7 @@ public class StateOutput { * @param axStateOutput the state output definition */ public StateOutput(final AxStateOutput axStateOutput) { - this(axStateOutput, new EnEvent(axStateOutput.getOutgingEvent())); + this(axStateOutput, new EnEvent(axStateOutput.getOutgoingEvent())); } /** @@ -64,8 +73,15 @@ public class StateOutput { Assertions.argumentNotNull(outputEvent, "outputEvent may not be null"); this.stateOutputDefinition = stateOutputDefinition; - this.outputEvent = outputEvent; - outputEventDef = ModelService.getModel(AxEvents.class).get(stateOutputDefinition.getOutgingEvent()); + this.outputEvents = new TreeMap<>(); + if (stateOutputDefinition.getOutgoingEventSet() != null + && !stateOutputDefinition.getOutgoingEventSet().isEmpty()) { + stateOutputDefinition.getOutgoingEventSet() + .forEach(outEvent -> outputEvents.put(outEvent, new EnEvent(outEvent))); + } else { + outputEvents.put(outputEvent.getKey(), outputEvent); + } + outputEventDef = ModelService.getModel(AxEvents.class).get(stateOutputDefinition.getOutgoingEvent()); } /** @@ -78,52 +94,89 @@ public class StateOutput { } /** - * Gets the state output definition. - * - * @return the state output definition - */ - public AxStateOutput getStateOutputDefinition() { - return stateOutputDefinition; - } - - /** - * Gets the output event. - * - * @return the output event - */ - public EnEvent getOutputEvent() { - return outputEvent; - } - - /** * Transfer the fields from the incoming field map into the event. * - * @param incomingFieldDefinitionMap definitions of the incoming fields - * @param eventFieldMap the event field map + * @param incomingEventDefinitionMap definitions of the incoming fields + * @param eventFieldMaps the event field map * @throws StateMachineException on errors populating the event fields */ - public void setEventFields(final Map<String, AxField> incomingFieldDefinitionMap, - final Map<String, Object> eventFieldMap) throws StateMachineException { - Assertions.argumentNotNull(incomingFieldDefinitionMap, "incomingFieldDefinitionMap may not be null"); - Assertions.argumentNotNull(eventFieldMap, "eventFieldMap may not be null"); - - if (!incomingFieldDefinitionMap.keySet().equals(eventFieldMap.keySet())) { - throw new StateMachineException( - "field definitions and values do not match for event " + outputEventDef.getId() + '\n' - + incomingFieldDefinitionMap.keySet() + '\n' + eventFieldMap.keySet()); + public void setEventFields(final Map<String, AxEvent> incomingEventDefinitionMap, + final Map<String, Map<String, Object>> eventFieldMaps) throws StateMachineException { + Assertions.argumentNotNull(incomingEventDefinitionMap, "incomingFieldDefinitionMap may not be null"); + Assertions.argumentNotNull(eventFieldMaps, "eventFieldMaps may not be null"); + + for (Entry<String, AxEvent> incomingEventDefinitionEntry : incomingEventDefinitionMap.entrySet()) { + String eventName = incomingEventDefinitionEntry.getKey(); + AxEvent eventDef = incomingEventDefinitionEntry.getValue(); + if (!eventDef.getParameterMap().keySet().equals(eventFieldMaps.get(eventName).keySet())) { + throw new StateMachineException( + "field definitions and values do not match for event " + eventDef.getId() + '\n' + + eventDef.getParameterMap().keySet() + '\n' + eventFieldMaps.get(eventName).keySet()); + } + } + var updateOnceFlag = false; + if (!outputEvents.keySet().stream().map(AxArtifactKey::getName).collect(Collectors.toSet()) + .equals(eventFieldMaps.keySet())) { + // when same task is used by multiple policies with different eventName but same fields, + // state outputs and task output events may be different + // in this case, update the output fields in the state output only once to avoid overwriting. + updateOnceFlag = true; } - for (final Entry<String, Object> incomingFieldEntry : eventFieldMap.entrySet()) { - final String fieldName = incomingFieldEntry.getKey(); - final AxField fieldDef = incomingFieldDefinitionMap.get(fieldName); + for (Entry<String, Map<String, Object>> eventFieldMapEntry : eventFieldMaps.entrySet()) { + String eventName = eventFieldMapEntry.getKey(); + Map<String, Object> outputEventFields = eventFieldMapEntry.getValue(); + AxEvent taskOutputEvent = incomingEventDefinitionMap.get(eventName); + EnEvent outputEventToUpdate = outputEvents.get(taskOutputEvent.getKey()); + + if (null == outputEventToUpdate) { + // happens only when same task is used by multiple policies with different eventName but same fields + // in this case, just match the fields and get the event in the stateOutput + Set<String> outputEventFieldNames = outputEventFields.keySet(); + Optional<EnEvent> outputEventOpt = outputEvents.values().stream().filter(outputEvent -> outputEvent + .getAxEvent().getParameterMap().keySet().equals(outputEventFieldNames)).findFirst(); + if (outputEventOpt.isEmpty()) { + throw new StateMachineException( + "Task output event field definition and state output event field doesn't match"); + } else { + outputEventToUpdate = outputEventOpt.get(); + } + } + updateOutputEventFields(taskOutputEvent, outputEventFields, outputEventToUpdate); + if (updateOnceFlag) { + break; + } + } + } + private void updateOutputEventFields(AxEvent taskOutputEvent, Map<String, Object> outputEventFields, + EnEvent outputEventToUpdate) throws StateMachineException { + for (Entry<String, Object> outputEventFieldEntry : outputEventFields.entrySet()) { + String fieldName = outputEventFieldEntry.getKey(); + Object fieldValue = outputEventFieldEntry.getValue(); + final AxField fieldDef = taskOutputEvent.getParameterMap().get(fieldName); + + Set<AxArtifactKey> outgoingEventSet = new TreeSet<>(); + if (null == stateOutputDefinition.getOutgoingEventSet() + || stateOutputDefinition.getOutgoingEventSet().isEmpty()) { + // if current state is not the final state, then the set could be empty. + // Just take the outgoingEvent field in this case + outgoingEventSet.add(stateOutputDefinition.getOutgoingEvent()); + } else { + outgoingEventSet.addAll(stateOutputDefinition.getOutgoingEventSet()); + } // Check if this field is a field in the event - if (!outputEventDef.getFields().contains(fieldDef)) { - throw new StateMachineException("field \"" + fieldName + "\" does not exist on event \"" - + outputEventDef.getId() + "\""); + for (AxArtifactKey outputEventKey : outgoingEventSet) { + if (outputEventKey.equals(taskOutputEvent.getKey())) { + outputEventDef = ModelService.getModel(AxEvents.class).get(outputEventKey); + // Check if this field is a field in the state output event + if (!outputEventDef.getFields().contains(fieldDef)) { + throw new StateMachineException( + "field \"" + fieldName + "\" does not exist on event \"" + outputEventDef.getId() + "\""); + } + } } - - // Set the value in the output event - outputEvent.put(fieldName, incomingFieldEntry.getValue()); + // Set the value in the correct output event + outputEventToUpdate.put(fieldName, fieldValue); } } @@ -135,25 +188,30 @@ public class StateOutput { */ public void copyUnsetFields(final EnEvent incomingEvent) { Assertions.argumentNotNull(incomingEvent, "incomingEvent may not be null"); - - for (final Entry<String, Object> incomingField : incomingEvent.entrySet()) { - final String fieldName = incomingField.getKey(); - - // Check if the field exists on the outgoing event - if ((!outputEventDef.getParameterMap().containsKey(fieldName)) - - // Check if the field is set on the outgoing event - || (outputEvent.containsKey(fieldName)) - - // Now, check the fields have the same type - || (!incomingEvent.getAxEvent().getParameterMap().get(fieldName) - .equals(outputEvent.getAxEvent().getParameterMap().get(fieldName)))) { - continue; - } - - // All checks done, we can copy the value - outputEvent.put(fieldName, incomingField.getValue()); + Set<AxArtifactKey> outgoingEventSet = new TreeSet<>(); + if (null == stateOutputDefinition.getOutgoingEventSet() + || stateOutputDefinition.getOutgoingEventSet().isEmpty()) { + // if current state is not the final state, then the set could be empty. + // Just take the outgoingEvent field in this case + outgoingEventSet.add(stateOutputDefinition.getOutgoingEvent()); + } else { + outgoingEventSet.addAll(stateOutputDefinition.getOutgoingEventSet()); } - + incomingEvent.forEach((inFieldName, inFieldValue) -> { + for (AxArtifactKey outputEventKey : outgoingEventSet) { + outputEventDef = ModelService.getModel(AxEvents.class).get(outputEventKey); + // Check if the field exists on the outgoing event + if (!outputEventDef.getParameterMap().containsKey(inFieldName) + // Check if the field is set in the outgoing event + || outputEvents.get(outputEventKey).containsKey(inFieldName) + // Now, check the fields have the same type + || !incomingEvent.getAxEvent().getParameterMap().get(inFieldName) + .equals(outputEvents.get(outputEventKey).getAxEvent().getParameterMap().get(inFieldName))) { + continue; + } + // All checks done, we can copy the value + outputEvents.get(outputEventKey).put(inFieldName, inFieldValue); + } + }); } } diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/TaskExecutor.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/TaskExecutor.java index 0150b65b2..31e27d244 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/TaskExecutor.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/TaskExecutor.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019-2020 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +25,6 @@ package org.onap.policy.apex.core.engine.executor; import static org.onap.policy.common.utils.validation.Assertions.argumentOfClassNotNull; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -41,8 +41,7 @@ import org.onap.policy.apex.core.engine.executor.context.TaskExecutionContext; import org.onap.policy.apex.core.engine.executor.exception.StateMachineException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; import org.onap.policy.apex.model.basicmodel.concepts.AxReferenceKey; -import org.onap.policy.apex.model.eventmodel.concepts.AxInputField; -import org.onap.policy.apex.model.eventmodel.concepts.AxOutputField; +import org.onap.policy.apex.model.eventmodel.concepts.AxField; import org.onap.policy.apex.model.policymodel.concepts.AxTask; import org.onap.policy.apex.model.policymodel.concepts.AxTaskParameter; import org.slf4j.ext.XLogger; @@ -56,7 +55,7 @@ import org.slf4j.ext.XLoggerFactory; * @author Liam Fallon (liam.fallon@ericsson.com) */ public abstract class TaskExecutor - implements Executor<Map<String, Object>, Map<String, Object>, AxTask, ApexInternalContext> { + implements Executor<Map<String, Object>, Map<String, Map<String, Object>>, AxTask, ApexInternalContext> { // Logger for this class private static final XLogger LOGGER = XLoggerFactory.getXLogger(TaskExecutor.class); @@ -67,10 +66,11 @@ public abstract class TaskExecutor // Holds the incoming and outgoing fields private Map<String, Object> incomingFields = null; - private Map<String, Object> outgoingFields = null; + private Map<String, Map<String, Object>> outgoingFieldsMap = null; // The next task executor - private Executor<Map<String, Object>, Map<String, Object>, AxTask, ApexInternalContext> nextExecutor = null; + private Executor<Map<String, Object>, Map<String, Map<String, Object>>, AxTask, ApexInternalContext> nextExecutor = + null; // The task execution context; contains the facades for events and context to be used by tasks // executed by this task @@ -104,7 +104,7 @@ public abstract class TaskExecutor * {@inheritDoc}. */ @Override - public Map<String, Object> execute(final long executionId, final Properties executionProperties, + public Map<String, Map<String, Object>> execute(final long executionId, final Properties executionProperties, final Map<String, Object> newIncomingFields) throws StateMachineException, ContextException { throw new StateMachineException( "execute() not implemented on abstract TaskExecutor class, only on its subclasses"); @@ -120,19 +120,13 @@ public abstract class TaskExecutor + getSubject().getKey().getId() + "," + getSubject().getTaskLogic().getLogic()); // Check that the incoming event has all the input fields for this state - final Set<String> missingTaskInputFields = new TreeSet<>(axTask.getInputFields().keySet()); + Map<String, AxField> inputEventParameterMap = axTask.getInputEvent().getParameterMap(); + final Set<String> missingTaskInputFields = new TreeSet<>(inputEventParameterMap.keySet()); missingTaskInputFields.removeAll(newIncomingFields.keySet()); // Remove fields from the set that are optional - final Set<String> optionalFields = new TreeSet<>(); - for (final Iterator<String> missingFieldIterator = missingTaskInputFields.iterator(); missingFieldIterator - .hasNext();) { - final String missingField = missingFieldIterator.next(); - if (axTask.getInputFields().get(missingField).getOptional()) { - optionalFields.add(missingField); - } - } - missingTaskInputFields.removeAll(optionalFields); + missingTaskInputFields.removeIf(missingField -> inputEventParameterMap.get(missingField).getOptional()); + if (!missingTaskInputFields.isEmpty()) { throw new StateMachineException("task input fields \"" + missingTaskInputFields + "\" are missing for task \"" + axTask.getKey().getId() + "\""); @@ -142,14 +136,15 @@ public abstract class TaskExecutor this.incomingFields = newIncomingFields; // Initiate the outgoing fields - outgoingFields = new TreeMap<>(); - for (final String outputFieldName : getSubject().getOutputFields().keySet()) { - outgoingFields.put(outputFieldName, null); + outgoingFieldsMap = new TreeMap<>(); + for (var outputEventEntry: axTask.getOutputEvents().entrySet()) { + Map<String, Object> outgoingFields = new TreeMap<>(); + outputEventEntry.getValue().getParameterMap().keySet().forEach(field -> outgoingFields.put(field, null)); + outgoingFieldsMap.put(outputEventEntry.getKey(), outgoingFields); } - // Get task context object executionContext = new TaskExecutionContext(this, executionId, executionProperties, getSubject(), getIncoming(), - getOutgoing(), getContext()); + outgoingFieldsMap.values(), getContext()); } /** @@ -167,71 +162,77 @@ public abstract class TaskExecutor throw new StateMachineException(errorMessage); } - // Copy any unset fields from the input to the output if their data type and names are - // identical - for (final String field : axTask.getOutputFields().keySet()) { - copyInputField2Output(field); - } + // Copy any unset fields from the input to the output if their data type and names are identical + axTask.getOutputEvents().entrySet().forEach(outputEventEntry -> outputEventEntry.getValue().getParameterMap() + .keySet().forEach(field -> copyInputField2Output(outputEventEntry.getKey(), field))); // Finally, check that the outgoing fields have all the output fields defined for this state - // and, if not, output - // a list of missing fields - final Set<String> missingTaskOutputFields = new TreeSet<>(axTask.getOutputFields().keySet()); - missingTaskOutputFields.removeAll(outgoingFields.keySet()); + // and, if not, output a list of missing fields + Map<String, Set<String>> missingTaskOutputFieldsMap = new TreeMap<>(); + axTask.getOutputEvents().entrySet().forEach(outputEventEntry -> { + Set<String> missingTaskOutputFields = new TreeSet<>(); + missingTaskOutputFields.addAll(outputEventEntry.getValue().getParameterMap().keySet()); + String key = outputEventEntry.getKey(); + missingTaskOutputFields.removeAll(outgoingFieldsMap.get(key).keySet()); + missingTaskOutputFieldsMap.put(key, missingTaskOutputFields); + }); // Remove fields from the set that are optional - final Set<String> optionalOrCopiedFields = new TreeSet<>(); - for (final Iterator<String> missingFieldIterator = missingTaskOutputFields.iterator(); missingFieldIterator - .hasNext();) { - final String missingField = missingFieldIterator.next(); - if (axTask.getInputFields().containsKey(missingField) - || axTask.getOutputFields().get(missingField).getOptional()) { - optionalOrCopiedFields.add(missingField); - } - } - missingTaskOutputFields.removeAll(optionalOrCopiedFields); - if (!missingTaskOutputFields.isEmpty()) { - throw new StateMachineException("task output fields \"" + missingTaskOutputFields - + "\" are missing for task \"" + axTask.getKey().getId() + "\""); + missingTaskOutputFieldsMap.entrySet() + .forEach(missingTaskOutputFieldsEntry -> missingTaskOutputFieldsEntry.getValue() + .removeIf(missingField -> axTask.getInputEvent().getParameterMap().containsKey(missingField) + || axTask.getOutputEvents().get(missingTaskOutputFieldsEntry.getKey()).getParameterMap() + .get(missingField).getOptional())); + missingTaskOutputFieldsMap.entrySet() + .removeIf(missingTaskOutputFieldsEntry -> missingTaskOutputFieldsEntry.getValue().isEmpty()); + if (!missingTaskOutputFieldsMap.isEmpty()) { + throw new StateMachineException("Fields for task output events \"" + missingTaskOutputFieldsMap.keySet() + + "\" are missing for task \"" + axTask.getKey().getId() + "\""); + } // Finally, check that the outgoing field map don't have any extra fields, if present, raise - // exception with the - // list of extra fields - final Set<String> extraTaskOutputFields = new TreeSet<>(outgoingFields.keySet()); - extraTaskOutputFields.removeAll(axTask.getOutputFields().keySet()); - if (!extraTaskOutputFields.isEmpty()) { - throw new StateMachineException("task output fields \"" + extraTaskOutputFields - + "\" are unwanted for task \"" + axTask.getKey().getId() + "\""); + // exception with the list of extra fields + final Map<String, Set<String>> extraTaskOutputFieldsMap = new TreeMap<>(); + outgoingFieldsMap.entrySet().forEach(outgoingFieldsEntry -> extraTaskOutputFieldsMap + .put(outgoingFieldsEntry.getKey(), new TreeSet<>(outgoingFieldsEntry.getValue().keySet()))); + extraTaskOutputFieldsMap.entrySet().forEach(extraTaskOutputFieldsEntry -> extraTaskOutputFieldsEntry.getValue() + .removeAll(axTask.getOutputEvents().get(extraTaskOutputFieldsEntry.getKey()).getParameterMap().keySet())); + extraTaskOutputFieldsMap.entrySet() + .removeIf(extraTaskOutputFieldsEntry -> extraTaskOutputFieldsEntry.getValue().isEmpty()); + if (!extraTaskOutputFieldsMap.isEmpty()) { + throw new StateMachineException("task output event \"" + extraTaskOutputFieldsMap.keySet() + + "\" contains fields that are unwanted for task \"" + axTask.getKey().getId() + "\""); } - String message = "execute-post:" + axTask.getKey().getId() + ", returning fields " + outgoingFields.toString(); + String message = + "execute-post:" + axTask.getKey().getId() + ", returning fields " + outgoingFieldsMap.toString(); LOGGER.debug(message); } /** * If the input field exists on the output and it is not set in the task, then it should be copied to the output. * + * @param eventName the event name * @param field the input field */ - private void copyInputField2Output(String field) { + private void copyInputField2Output(String eventName, String field) { + Map<String, Object> outgoingFields = outgoingFieldsMap.get(eventName); // Check if the field exists and is not set on the output - if (getOutgoing().containsKey(field) && getOutgoing().get(field) != null) { + if (outgoingFields.get(field) != null) { return; } // This field is not in the output, check if it's on the input and is the same type - // (Note here, the output - // field definition has to exist so it's not - // null checked) - final AxInputField inputFieldDef = axTask.getInputFields().get(field); - final AxOutputField outputFieldDef = axTask.getOutputFields().get(field); + // (Note here, the output field definition has to exist so it's not null checked) + final AxField inputFieldDef = axTask.getInputEvent().getParameterMap().get(field); + final AxField outputFieldDef = axTask.getOutputEvents().get(eventName).getParameterMap().get(field); if (inputFieldDef == null || !inputFieldDef.getSchema().equals(outputFieldDef.getSchema())) { return; } // We have an input field that matches our output field, copy the value across - getOutgoing().put(field, getIncoming().get(field)); + outgoingFields.put(field, getIncoming().get(field)); } /** @@ -308,15 +309,16 @@ public abstract class TaskExecutor * {@inheritDoc}. */ @Override - public Map<String, Object> getOutgoing() { - return outgoingFields; + public Map<String, Map<String, Object>> getOutgoing() { + return outgoingFieldsMap; } /** * {@inheritDoc}. */ @Override - public void setNext(final Executor<Map<String, Object>, Map<String, Object>, AxTask, ApexInternalContext> nextEx) { + public void setNext( + final Executor<Map<String, Object>, Map<String, Map<String, Object>>, AxTask, ApexInternalContext> nextEx) { this.nextExecutor = nextEx; } @@ -324,7 +326,7 @@ public abstract class TaskExecutor * {@inheritDoc}. */ @Override - public Executor<Map<String, Object>, Map<String, Object>, AxTask, ApexInternalContext> getNext() { + public Executor<Map<String, Object>, Map<String, Map<String, Object>>, AxTask, ApexInternalContext> getNext() { return nextExecutor; } diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java index f3906213f..5f39bcd56 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -84,18 +85,34 @@ public class AxTaskFacade { */ public SchemaHelper getInFieldSchemaHelper(final String fieldName) { // Find the field for the field name - return getFieldSchemaHelper(fieldName, task.getInputFields().get(fieldName), "incoming"); + return getFieldSchemaHelper(fieldName, task.getInputEvent().getParameterMap().get(fieldName), "incoming"); } /** * Creates a schema helper for an outgoing field of this task. + * This method can be used only when there is a single event output as part of a task * * @param fieldName The name of the field to get a schema helper for * @return the schema helper for this field */ public SchemaHelper getOutFieldSchemaHelper(final String fieldName) { // Find the field for the field name - return getFieldSchemaHelper(fieldName, task.getOutputFields().get(fieldName), "outgoing"); + return getFieldSchemaHelper(fieldName, + task.getOutputEvents().values().iterator().next().getParameterMap().get(fieldName), "outgoing"); + } + + /** + * Creates a schema helper for an outgoing field of this task. + * This method can be used when there are multiple event outputs from a task + * + * @param eventName the name of the event to which the field belongs to + * @param fieldName The name of the field to get a schema helper for + * @return the schema helper for this field + */ + public SchemaHelper getOutFieldSchemaHelper(final String eventName, final String fieldName) { + // Find the field for the field name + return getFieldSchemaHelper(fieldName, task.getOutputEvents().get(eventName).getParameterMap().get(fieldName), + "outgoing"); } /** diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/TaskExecutionContext.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/TaskExecutionContext.java index 49459dfae..a54252e05 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/TaskExecutionContext.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/TaskExecutionContext.java @@ -3,6 +3,7 @@ * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2020 Nordix Foundation. * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +24,7 @@ package org.onap.policy.apex.core.engine.executor.context; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -73,6 +75,12 @@ public class TaskExecutionContext extends AbstractExecutionContext { public final Map<String, Object> outFields; /** + * The outgoing fields from the task. The task logic can access and set these fields with its logic. A task outputs + * its result using these fields. + */ + public final Collection<Map<String, Object>> outFieldsList; + + /** * Logger for task execution, task logic can use this field to access and log to Apex logging. */ public final XLogger logger = EXECUTION_LOGGER; @@ -97,12 +105,12 @@ public class TaskExecutionContext extends AbstractExecutionContext { * @param executionProperties the execution properties for task execution * @param axTask the task definition that is the subject of execution * @param inFields the in fields - * @param outFields the out fields + * @param outFieldsList collection of the out fields * @param internalContext the execution context of the Apex engine in which the task is being executed */ public TaskExecutionContext(final TaskExecutor taskExecutor, final long executionId, final Properties executionProperties, final AxTask axTask, final Map<String, Object> inFields, - final Map<String, Object> outFields, final ApexInternalContext internalContext) { + final Collection<Map<String, Object>> outFieldsList, final ApexInternalContext internalContext) { super(executionId, executionProperties); // The subject is the task definition @@ -113,7 +121,13 @@ public class TaskExecutionContext extends AbstractExecutionContext { // The input and output fields this.inFields = Collections.unmodifiableMap(inFields); - this.outFields = outFields; + this.outFieldsList = outFieldsList; + // if only a single output event needs to fired from a task, the outFields alone can be used too + if (outFieldsList.isEmpty()) { + this.outFields = new TreeMap<>(); + } else { + this.outFields = outFieldsList.iterator().next(); + } // Set up the context albums for this task context = new TreeMap<>(); @@ -156,7 +170,7 @@ public class TaskExecutionContext extends AbstractExecutionContext { */ public ContextAlbum getContextAlbum(final String contextAlbumName) { // Find the context album - final ContextAlbum foundContextAlbum = context.get(contextAlbumName); + final var foundContextAlbum = context.get(contextAlbumName); // Check if the context album exists if (foundContextAlbum != null) { @@ -166,4 +180,16 @@ public class TaskExecutionContext extends AbstractExecutionContext { + "\" on task \"" + subject.getId() + "\""); } } + + /** + * Method to add fields to the output event list. + * @param fields the fields to be added + */ + public void addFieldsToOutput(Map<String, Object> fields) { + for (Map<String, Object> outputFields : outFieldsList) { + if (outputFields.keySet().containsAll(fields.keySet())) { + outputFields.replaceAll((name, value) -> fields.get(name)); + } + } + } } diff --git a/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/engine/impl/DummySmExecutor.java b/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/engine/impl/DummySmExecutor.java index 18d32140b..df4d9279e 100644 --- a/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/engine/impl/DummySmExecutor.java +++ b/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/engine/impl/DummySmExecutor.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +22,8 @@ package org.onap.policy.apex.core.engine.engine.impl; +import java.util.Collection; +import java.util.List; import java.util.Properties; import org.onap.policy.apex.core.engine.event.EnEvent; import org.onap.policy.apex.core.engine.executor.ExecutorFactory; @@ -62,8 +65,9 @@ public class DummySmExecutor extends StateMachineExecutor { * {@inheritDoc}. */ @Override - public EnEvent execute(final long executionId, final Properties executionProperties, final EnEvent incomingEvent) { - return incomingEvent; + public Collection<EnEvent> execute(final long executionId, final Properties executionProperties, + final EnEvent incomingEvent) { + return List.of(incomingEvent); } /** diff --git a/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/DummyTaskExecutor.java b/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/DummyTaskExecutor.java index 8d1aa5f0d..75c4c0d7c 100644 --- a/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/DummyTaskExecutor.java +++ b/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/DummyTaskExecutor.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019-2020 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,12 +28,14 @@ import org.onap.policy.apex.context.ContextException; import org.onap.policy.apex.core.engine.event.EnEvent; import org.onap.policy.apex.core.engine.executor.exception.StateMachineException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; import org.onap.policy.apex.model.policymodel.concepts.AxTask; /** * Dummy task executor for testing. */ public class DummyTaskExecutor extends TaskExecutor { + private static final String EVENT_KEY = "Event1:0.0.1"; private boolean override; public DummyTaskExecutor() { @@ -54,14 +57,14 @@ public class DummyTaskExecutor extends TaskExecutor { * {@inheritDoc}. */ @Override - public Map<String, Object> execute(final long executionId, final Properties executionProperties, + public Map<String, Map<String, Object>> execute(final long executionId, final Properties executionProperties, final Map<String, Object> newIncomingFields) throws StateMachineException, ContextException { if (!override) { super.execute(executionId, executionProperties, newIncomingFields); } - AxArtifactKey event0Key = new AxArtifactKey("Event0:0.0.1"); - return new EnEvent(event0Key); + AxArtifactKey eventKey = new AxArtifactKey(EVENT_KEY); + return Map.of(eventKey.getName(), new EnEvent(eventKey)); } /** @@ -74,7 +77,9 @@ public class DummyTaskExecutor extends TaskExecutor { } AxArtifactKey taskKey = new AxArtifactKey("FirstTask:0.0.1"); - return new AxTask(taskKey); + AxTask task = new AxTask(taskKey); + task.setOutputEvents(Map.of("Event1", new AxEvent(new AxArtifactKey(EVENT_KEY)))); + return task; } /** diff --git a/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/StateMachineExecutorTest.java b/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/StateMachineExecutorTest.java index 2acb57681..2d274bd2e 100644 --- a/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/StateMachineExecutorTest.java +++ b/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/StateMachineExecutorTest.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2020 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +25,10 @@ package org.onap.policy.apex.core.engine.executor; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; import org.junit.After; @@ -65,7 +69,7 @@ public class StateMachineExecutorTest { private ApexInternalContext internalContextMock; @Mock - private Executor<EnEvent, EnEvent, AxPolicy, ApexInternalContext> nextExecutorMock; + private Executor<EnEvent, Collection<EnEvent>, AxPolicy, ApexInternalContext> nextExecutorMock; @Mock private ExecutorFactory executorFactoryMock; @@ -191,18 +195,18 @@ public class StateMachineExecutorTest { .hasMessage("no states defined on state machine"); executor.setContext(null, axPolicy, internalContextMock); assertEquals("Policy:0.0.1", executor.getKey().getId()); - assertEquals(null, executor.getParent()); + assertNull(executor.getParent()); assertEquals(internalContextMock, executor.getContext()); - assertEquals(null, executor.getNext()); - assertEquals(null, executor.getIncoming()); - assertEquals(null, executor.getOutgoing()); + assertNull(executor.getNext()); + assertNull(executor.getIncoming()); + assertTrue(executor.getOutgoing().isEmpty()); assertEquals(axPolicy, executor.getSubject()); executor.setParameters(new ExecutorParameters()); executor.setNext(nextExecutorMock); assertEquals(nextExecutorMock, executor.getNext()); executor.setNext(null); - assertEquals(null, executor.getNext()); + assertNull(executor.getNext()); assertThatThrownBy(() -> executor.executePre(0, null, null)) .hasMessage("execution pre work not implemented on class"); @@ -294,34 +298,37 @@ public class StateMachineExecutorTest { assertThatThrownBy(() -> output.setEventFields(null, null)) .hasMessage("incomingFieldDefinitionMap may not be null"); - Map<String, AxField> incomingFieldDefinitionMap = new LinkedHashMap<>(); + Map<String, AxEvent> incomingFieldDefinitionMap = new LinkedHashMap<>(); assertThatThrownBy(() -> output.setEventFields(incomingFieldDefinitionMap, null)) - .hasMessage("eventFieldMap may not be null"); - Map<String, Object> eventFieldMap = new LinkedHashMap<>(); - output.setEventFields(incomingFieldDefinitionMap, eventFieldMap); - - eventFieldMap.put("key", "Value"); - assertThatThrownBy(() -> output.setEventFields(incomingFieldDefinitionMap, eventFieldMap)) - .hasMessage("field definitions and values do not match for event Event1:0.0.1\n[]\n[key]"); - AxField axBadFieldDefinition = new AxField(); - incomingFieldDefinitionMap.put("key", axBadFieldDefinition); - assertThatThrownBy(() -> output.setEventFields(incomingFieldDefinitionMap, eventFieldMap)) + .hasMessage("eventFieldMaps may not be null"); + Map<String, Map<String, Object>> eventFieldMaps = new LinkedHashMap<>(); + output.setEventFields(incomingFieldDefinitionMap, eventFieldMaps); + AxEvent event = new AxEvent(new AxArtifactKey("Event1", "0.0.1")); + event.setParameterMap(Map.of("key", new AxField())); + incomingFieldDefinitionMap.put("Event1", event); + eventFieldMaps.put("Event1", Map.of("key2", "value")); + assertThatThrownBy(() -> output.setEventFields(incomingFieldDefinitionMap, eventFieldMaps)) + .hasMessage("field definitions and values do not match for event Event1:0.0.1\n[key]\n[key2]"); + + eventFieldMaps.put("Event1", Map.of("key", "value")); + assertThatThrownBy(() -> output.setEventFields(incomingFieldDefinitionMap, eventFieldMaps)) .hasMessage("field \"key\" does not exist on event \"Event1:0.0.1\""); + incomingFieldDefinitionMap.clear(); - eventFieldMap.clear(); + eventFieldMaps.clear(); AxArtifactKey stringSchemaKey = new AxArtifactKey("StringSchema:0.0.1"); AxReferenceKey fieldKey = new AxReferenceKey("Event1:0.0.1:event:Field0"); AxField event1Field0Definition = new AxField(fieldKey, stringSchemaKey); - incomingFieldDefinitionMap.put("Event1Field0", event1Field0Definition); - eventFieldMap.put("Event1Field0", "Value"); - output.setEventFields(incomingFieldDefinitionMap, eventFieldMap); + event.setParameterMap(Map.of("Event1Field0", event1Field0Definition)); + incomingFieldDefinitionMap.put("Event1", event); + eventFieldMaps.put("Event1", Map.of("Event1Field0", "Value")); + output.setEventFields(incomingFieldDefinitionMap, eventFieldMaps); StateOutput outputCopy = new StateOutput(axPolicy.getStateMap().get("State0") .getStateOutputs().get("stateOutput0")); EnEvent incomingEvent = new EnEvent(new AxArtifactKey("Event0:0.0.1")); outputCopy.copyUnsetFields(incomingEvent); - incomingEvent.put("Event1Field0", "Hello"); outputCopy.copyUnsetFields(incomingEvent); } diff --git a/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/TaskExecutorTest.java b/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/TaskExecutorTest.java index bbedb886c..1ddc3f5b5 100644 --- a/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/TaskExecutorTest.java +++ b/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/TaskExecutorTest.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2020 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +33,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.TreeMap; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -44,6 +46,8 @@ import org.onap.policy.apex.core.engine.context.ApexInternalContext; import org.onap.policy.apex.core.engine.executor.exception.StateMachineException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; import org.onap.policy.apex.model.basicmodel.concepts.AxReferenceKey; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; +import org.onap.policy.apex.model.eventmodel.concepts.AxField; import org.onap.policy.apex.model.eventmodel.concepts.AxInputField; import org.onap.policy.apex.model.eventmodel.concepts.AxOutputField; import org.onap.policy.apex.model.policymodel.concepts.AxTask; @@ -76,14 +80,16 @@ public class TaskExecutorTest { private AxOutputField axMissingOutputFieldMock; @Mock - private Executor<Map<String, Object>, Map<String, Object>, AxTask, ApexInternalContext> nextExecutorMock; + private Executor<Map<String, Object>, Map<String, Map<String, Object>>, AxTask, + ApexInternalContext> nextExecutorMock; @Mock private AxTaskLogic taskLogicMock; - private LinkedHashMap<String, Object> inFieldMap; - private LinkedHashMap<String, Object> outFieldMap; + private Map<String, AxField> inFieldMap; + private Map<String, AxField> outFieldMap; private List<TaskParameters> taskParametersFromConfig; + private Map<String, AxEvent> outEvents = new TreeMap<>(); /** * Set up mocking. @@ -96,13 +102,17 @@ public class TaskExecutorTest { Mockito.doReturn(task0Key).when(axTaskMock).getKey(); Mockito.doReturn(task0Key.getId()).when(axTaskMock).getId(); - inFieldMap = new LinkedHashMap<>(); + inFieldMap = Map.of("InField0", axInputFieldMock, "InField1", axOptionalInputFieldMock); outFieldMap = new LinkedHashMap<>(); - inFieldMap.put("InField0", axInputFieldMock); - inFieldMap.put("InField1", axOptionalInputFieldMock); outFieldMap.put("OutField0", axOutputFieldMock); - outFieldMap.put("OutField0", axOptionalOutputFieldMock); + outFieldMap.put("OutField1", axOptionalOutputFieldMock); + + AxEvent inEvent = new AxEvent(); + inEvent.setParameterMap(inFieldMap); + AxEvent outEvent = new AxEvent(new AxArtifactKey("outputEvent:1.0.0")); + outEvent.setParameterMap(outFieldMap); + outEvents.put(outEvent.getKey().getName(), outEvent); AxArtifactKey schemaKey = new AxArtifactKey("Schema:0.0.1"); Mockito.doReturn(schemaKey).when(axInputFieldMock).getSchema(); @@ -119,6 +129,9 @@ public class TaskExecutorTest { Mockito.doReturn(outFieldMap).when(axTaskMock).getOutputFields(); Mockito.doReturn(taskLogicMock).when(axTaskMock).getTaskLogic(); + Mockito.doReturn(inEvent).when(axTaskMock).getInputEvent(); + Mockito.doReturn(outEvents).when(axTaskMock).getOutputEvents(); + Mockito.doReturn(new AxArtifactKey("Context:0.0.1")).when(internalContextMock).getKey(); Map<String, AxTaskParameter> taskParameters = new HashMap<>(); @@ -162,9 +175,6 @@ public class TaskExecutorTest { Map<String, Object> incomingFields = new LinkedHashMap<>(); - assertThatThrownBy(() -> executor.executePre(0, new Properties(), incomingFields)) - .hasMessageContaining("task input fields \"[InField0]\" are missing for task \"Task0:0.0.1\""); - incomingFields.put("InField0", "A Value"); executor.executePre(0, new Properties(), incomingFields); @@ -184,15 +194,16 @@ public class TaskExecutorTest { executor.executePost(true); outFieldMap.put("MissingField", axMissingOutputFieldMock); - - assertThatThrownBy(() -> executor.executePost(true)) - .hasMessageContaining("task output fields \"[MissingField]\" are missing for task \"Task0:0.0.1\""); + outEvents.get("outputEvent").getParameterMap().put("MissingField", axMissingOutputFieldMock); + assertThatThrownBy(() -> executor.executePost(true)).hasMessageContaining( + "Fields for task output events \"[outputEvent]\" are missing for task \"Task0:0.0.1\""); outFieldMap.remove("MissingField"); + outEvents.get("outputEvent").getParameterMap().remove("MissingField"); executor.getExecutionContext().outFields.put("BadExtraField", "Howdy!"); - assertThatThrownBy(() -> executor.executePost(true)) - .hasMessageContaining("task output fields \"[BadExtraField]\" are unwanted for task \"Task0:0.0.1\""); + assertThatThrownBy(() -> executor.executePost(true)).hasMessageContaining( + "task output event \"[outputEvent]\" contains fields that are unwanted for task \"Task0:0.0.1\""); executor.getExecutionContext().outFields.remove("BadExtraField"); outFieldMap.put("InField1", axMissingOutputFieldMock); @@ -202,6 +213,7 @@ public class TaskExecutorTest { executor.executePost(true); executor.getExecutionContext().outFields.put("InField0", "Output Value"); + outEvents.get("outputEvent").getParameterMap().put("InField0", axMissingOutputFieldMock); executor.executePost(true); executor.getExecutionContext().outFields.remove("InField0"); diff --git a/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacadeTest.java b/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacadeTest.java index 8ef78efe9..6f8402e55 100644 --- a/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacadeTest.java +++ b/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacadeTest.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2020 Nordix Foundation + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,8 +26,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import java.util.LinkedHashMap; import java.util.Map; +import java.util.TreeMap; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -40,6 +41,8 @@ import org.onap.policy.apex.model.basicmodel.concepts.AxReferenceKey; import org.onap.policy.apex.model.basicmodel.service.ModelService; import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema; import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchemas; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; +import org.onap.policy.apex.model.eventmodel.concepts.AxField; import org.onap.policy.apex.model.eventmodel.concepts.AxInputField; import org.onap.policy.apex.model.eventmodel.concepts.AxOutputField; import org.onap.policy.apex.model.policymodel.concepts.AxTask; @@ -83,13 +86,18 @@ public class AxTaskFacadeTest { Mockito.doReturn(task0Key).when(axTaskMock).getKey(); Mockito.doReturn(task0Key.getId()).when(axTaskMock).getId(); - Map<String, AxInputField> inFieldMap = new LinkedHashMap<>(); - Map<String, AxOutputField> outFieldMap = new LinkedHashMap<>(); + Map<String, AxField> inFieldMap = Map.of("InField0", axInputFieldMock, "InFieldBad", axInputFieldBadMock); + Map<String, AxField> outFieldMap = Map.of("OutField0", axOutputFieldMock, "OutFieldBad", axOutputFieldBadMock); - inFieldMap.put("InField0", axInputFieldMock); - inFieldMap.put("InFieldBad", axInputFieldBadMock); - outFieldMap.put("OutField0", axOutputFieldMock); - outFieldMap.put("OutFieldBad", axOutputFieldBadMock); + AxEvent inEvent = new AxEvent(); + inEvent.setParameterMap(inFieldMap); + AxEvent outEvent = new AxEvent(new AxArtifactKey("outputEvent:1.0.0")); + outEvent.setParameterMap(outFieldMap); + Map<String, AxEvent> outEvents = new TreeMap<>(); + outEvents.put(outEvent.getKey().getName(), outEvent); + + Mockito.doReturn(inEvent).when(axTaskMock).getInputEvent(); + Mockito.doReturn(outEvents).when(axTaskMock).getOutputEvents(); Mockito.doReturn(inFieldMap).when(axTaskMock).getInputFields(); Mockito.doReturn(outFieldMap).when(axTaskMock).getOutputFields(); diff --git a/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/context/TaskExecutionContextTest.java b/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/context/TaskExecutionContextTest.java index f8bdc4bdd..24c504822 100644 --- a/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/context/TaskExecutionContextTest.java +++ b/core/core-engine/src/test/java/org/onap/policy/apex/core/engine/executor/context/TaskExecutionContextTest.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2020 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +29,8 @@ import static org.junit.Assert.assertNotNull; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; import org.junit.Before; @@ -93,10 +96,10 @@ public class TaskExecutionContextTest { @Test public void test() { final Map<String, Object> inFields = new LinkedHashMap<>(); - final Map<String, Object> outFields = new LinkedHashMap<>(); + final List<Map<String, Object>> outFieldsList = new LinkedList<>(); - TaskExecutionContext tec = new TaskExecutionContext(taskExecutorMock, 0, null, axTaskMock, inFields, outFields, - internalContextMock); + TaskExecutionContext tec = new TaskExecutionContext(taskExecutorMock, 0, null, axTaskMock, inFields, + outFieldsList, internalContextMock); assertNotNull(tec); tec.setMessage("TEC Message"); |