diff options
Diffstat (limited to 'core/core-engine/src/main/java/org/onap')
8 files changed, 342 insertions, 174 deletions
diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java index fd5fe131f..878d5277a 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019-2020 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,9 +24,13 @@ package org.onap.policy.apex.core.engine.engine.impl; import static org.onap.policy.common.utils.validation.Assertions.argumentNotNull; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import org.onap.policy.apex.context.ContextAlbum; import org.onap.policy.apex.context.ContextException; import org.onap.policy.apex.core.engine.context.ApexInternalContext; @@ -41,7 +46,12 @@ import org.onap.policy.apex.model.basicmodel.concepts.AxReferenceKey; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineStats; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; +import org.onap.policy.apex.model.policymodel.concepts.AxState; +import org.onap.policy.apex.model.policymodel.concepts.AxStateOutput; +import org.onap.policy.apex.model.policymodel.concepts.AxStateTaskReference; +import org.onap.policy.apex.model.policymodel.concepts.AxTask; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; @@ -122,6 +132,8 @@ public class ApexEngineImpl implements ApexEngine { } } + populateIoEventsToTask(apexModel); + // Create new internal context or update the existing one try { if (internalContext == null) { @@ -149,6 +161,48 @@ public class ApexEngineImpl implements ApexEngine { LOGGER.exit(UPDATE_MODEL + key.getId()); } + + private void populateIoEventsToTask(AxPolicyModel apexPolicyModel) { + Set<AxArtifactKey> updatedTasks = new TreeSet<>(); + for (var axPolicy : apexPolicyModel.getPolicies().getPolicyMap().values()) { + for (var axState : axPolicy.getStateMap().values()) { + AxEvent triggerEvent = apexPolicyModel.getEvents().get(axState.getTrigger()); + axState.getTaskReferences().forEach((taskKey, taskRef) -> { + AxTask task = apexPolicyModel.getTasks().getTaskMap().get(taskKey); + task.setInputEvent(triggerEvent); + updateTaskBasedOnStateOutput(apexPolicyModel, updatedTasks, axState, taskKey, taskRef, task); + updatedTasks.add(taskKey); + }); + } + } + } + + private void updateTaskBasedOnStateOutput(AxPolicyModel apexPolicyModel, Set<AxArtifactKey> updatedTasks, + AxState state, AxArtifactKey taskKey, AxStateTaskReference taskRef, AxTask task) { + Map<String, AxEvent> outputEvents = new TreeMap<>(); + AxStateOutput stateOutput = state.getStateOutputs().get(taskRef.getOutput().getLocalName()); + if (null == stateOutput.getOutgoingEventSet() || stateOutput.getOutgoingEventSet().isEmpty()) { + Set<AxArtifactKey> outEventSet = new TreeSet<>(); + outEventSet.add(stateOutput.getOutgoingEvent()); + stateOutput.setOutgoingEventSet(outEventSet); + } + if (state.getNextStateSet().isEmpty() + || state.getNextStateSet().contains(AxReferenceKey.getNullKey().getLocalName())) { + stateOutput.getOutgoingEventSet().forEach(outgoingEventKey -> outputEvents.put(outgoingEventKey.getName(), + apexPolicyModel.getEvents().get(outgoingEventKey))); + } else { + AxArtifactKey outgoingEventKey = stateOutput.getOutgoingEvent(); + outputEvents.put(outgoingEventKey.getName(), apexPolicyModel.getEvents().get(outgoingEventKey)); + } + if (updatedTasks.contains(taskKey)) { + // this happens only when same task is used by multiple policies + // with different eventName but same fields + task.getOutputEvents().putAll(outputEvents); + } else { + task.setOutputEvents(outputEvents); + } + } + /** * {@inheritDoc}. */ @@ -288,7 +342,7 @@ public class ApexEngineImpl implements ApexEngine { */ @Override public boolean handleEvent(final EnEvent incomingEvent) { - boolean ret = false; + var ret = false; if (incomingEvent == null) { LOGGER.warn("handleEvent()<-{},{}, cannot run engine, incoming event is null", key.getId(), state); return ret; @@ -307,17 +361,17 @@ public class ApexEngineImpl implements ApexEngine { LOGGER.debug(message); // By default we return a null event on errors - EnEvent outgoingEvent = null; + Collection<EnEvent> outgoingEvents = null; try { engineStats.executionEnter(incomingEvent.getKey()); - outgoingEvent = stateMachineHandler.execute(incomingEvent); + outgoingEvents = stateMachineHandler.execute(incomingEvent); engineStats.executionExit(); ret = true; } catch (final StateMachineException e) { LOGGER.warn("handleEvent()<-{},{}, engine execution error: ", key.getId(), state, e); // Create an exception return event - outgoingEvent = createExceptionEvent(incomingEvent, e); + outgoingEvents = createExceptionEvent(incomingEvent, e); } // Publish the outgoing event @@ -325,10 +379,12 @@ public class ApexEngineImpl implements ApexEngine { synchronized (eventListeners) { if (eventListeners.isEmpty()) { LOGGER.debug("handleEvent()<-{},{}, There is no listener registered to recieve outgoing event: {}", - key.getId(), state, outgoingEvent); + key.getId(), state, outgoingEvents); } for (final EnEventListener axEventListener : eventListeners.values()) { - axEventListener.onEnEvent(outgoingEvent); + for (var outgoingEvent : outgoingEvents) { + axEventListener.onEnEvent(outgoingEvent); + } } } } catch (final ApexException e) { @@ -398,7 +454,7 @@ public class ApexEngineImpl implements ApexEngine { */ @Override public AxEngineModel getEngineStatus() { - final AxEngineModel engineModel = new AxEngineModel(key); + final var engineModel = new AxEngineModel(key); engineModel.setTimestamp(System.currentTimeMillis()); engineModel.setState(state); engineModel.setStats(engineStats); @@ -440,14 +496,14 @@ public class ApexEngineImpl implements ApexEngine { * @param eventException The exception that was thrown * @return the exception event */ - private EnEvent createExceptionEvent(final EnEvent incomingEvent, final Exception eventException) { + private Set<EnEvent> createExceptionEvent(final EnEvent incomingEvent, final Exception eventException) { // The exception event is a clone of the incoming event with the exception suffix added to // its name and an extra // field "ExceptionMessage" added final EnEvent exceptionEvent = (EnEvent) incomingEvent.clone(); // Create the cascaded message string - final StringBuilder exceptionMessageStringBuilder = new StringBuilder(); + final var exceptionMessageStringBuilder = new StringBuilder(); exceptionMessageStringBuilder.append(eventException.getMessage()); Throwable subException = eventException.getCause(); @@ -460,6 +516,6 @@ public class ApexEngineImpl implements ApexEngine { // Set the exception message on the event exceptionEvent.setExceptionMessage(exceptionMessageStringBuilder.toString()); - return exceptionEvent; + return Set.of(exceptionEvent); } } diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/StateMachineHandler.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/StateMachineHandler.java index 32f86e465..c173d1f09 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/StateMachineHandler.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/StateMachineHandler.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +21,9 @@ package org.onap.policy.apex.core.engine.engine.impl; +import java.util.Collection; import java.util.HashMap; +import java.util.Set; import org.onap.policy.apex.core.engine.context.ApexInternalContext; import org.onap.policy.apex.core.engine.event.EnEvent; import org.onap.policy.apex.core.engine.executor.ExecutorFactory; @@ -79,8 +82,7 @@ public class StateMachineHandler { // Iterate over the policies in the policy model and create a state machine for each one for (final AxPolicy policy : ModelService.getModel(AxPolicies.class).getPolicyMap().values()) { // Create a state machine for this policy - final StateMachineExecutor thisStateMachineExecutor = - new StateMachineExecutor(executorFactory, policy.getKey()); + final var thisStateMachineExecutor = new StateMachineExecutor(executorFactory, policy.getKey()); // This executor is the top executor so has no parent thisStateMachineExecutor.setContext(null, policy, internalContext); @@ -90,8 +92,7 @@ public class StateMachineHandler { .get(policy.getStateMap().get(policy.getFirstState()).getTrigger()); // Put the state machine executor on the map for this trigger - final StateMachineExecutor lastStateMachineExecutor = - stateMachineExecutorMap.put(triggerEvent, thisStateMachineExecutor); + final var lastStateMachineExecutor = stateMachineExecutorMap.put(triggerEvent, thisStateMachineExecutor); if (lastStateMachineExecutor != null && lastStateMachineExecutor.getSubject() != thisStateMachineExecutor.getSubject()) { LOGGER.error("No more than one policy in a model can have the same trigger event. In model " @@ -138,32 +139,32 @@ public class StateMachineHandler { * @return The result of the state machine execution run * @throws StateMachineException On execution errors in a state machine */ - protected EnEvent execute(final EnEvent event) throws StateMachineException { + protected Collection<EnEvent> execute(final EnEvent event) throws StateMachineException { LOGGER.entry("execute()->" + event.getName()); // Try to execute the state machine for the trigger - final StateMachineExecutor stateMachineExecutor = stateMachineExecutorMap.get(event.getAxEvent()); + final var stateMachineExecutor = stateMachineExecutorMap.get(event.getAxEvent()); if (stateMachineExecutor == null) { final String exceptionMessage = - "state machine execution not possible, policy not found for trigger event " + event.getName(); + "state machine execution not possible, policy not found for trigger event " + event.getName(); LOGGER.warn(exceptionMessage); event.setExceptionMessage(exceptionMessage); - return event; + return Set.of(event); } // Run the state machine try { LOGGER.debug("execute(): state machine \"{}\" execution starting . . .", stateMachineExecutor); - final EnEvent outputObject = - stateMachineExecutor.execute(event.getExecutionId(), event.getExecutionProperties(), event); + final Collection<EnEvent> outputEvents = + stateMachineExecutor.execute(event.getExecutionId(), event.getExecutionProperties(), event); LOGGER.debug("execute()<-: state machine \"{}\" execution completed", stateMachineExecutor); - return outputObject; + return outputEvents; } catch (final Exception e) { LOGGER.warn("execute()<-: state machine \"" + stateMachineExecutor + "\" execution failed", e); throw new StateMachineException("execute()<-: execution failed on state machine " + stateMachineExecutor, - e); + e); } } diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateExecutor.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateExecutor.java index b04c354e5..11d0aa1c9 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateExecutor.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateExecutor.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019-2020 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -196,7 +197,7 @@ public class StateExecutor implements Executor<EnEvent, StateOutput, AxState, Ap // Execute the task final TreeMap<String, Object> incomingValues = new TreeMap<>(); incomingValues.putAll(incomingEvent); - final Map<String, Object> taskExecutionResultMap = + final Map<String, Map<String, Object>> taskExecutionResultMap = taskExecutorMap.get(taskKey).execute(executionId, executionProperties, incomingValues); final AxTask task = taskExecutorMap.get(taskKey).getSubject(); @@ -215,8 +216,9 @@ public class StateExecutor implements Executor<EnEvent, StateOutput, AxState, Ap // Execute the state finalizer logic to select a state output and to adjust the // taskExecutionResultMap - stateOutputName = - finalizerLogicExecutor.execute(executionId, executionProperties, taskExecutionResultMap); + // Multiple event outputs are possible only from final state, otherwise there will be only 1 outputevent + stateOutputName = finalizerLogicExecutor.execute(executionId, executionProperties, + taskExecutionResultMap.values().iterator().next()); } // Now look up the the actual state output @@ -227,19 +229,19 @@ public class StateExecutor implements Executor<EnEvent, StateOutput, AxState, Ap } // Create the state output and transfer all the fields across to its event - final StateOutput stateOutput = new StateOutput(stateOutputDefinition); + final var stateOutput = new StateOutput(stateOutputDefinition); this.lastStateOutput = stateOutput; - stateOutput.setEventFields(task.getRawOutputFields(), taskExecutionResultMap); + stateOutput.setEventFields(task.getOutputEvents(), taskExecutionResultMap); // Copy across fields from the incoming event that are not set on the outgoing event stateOutput.copyUnsetFields(incomingEvent); // Set the ExecutionID for the outgoing event to the value in the incoming event. - if (stateOutput.getOutputEvent() != null) { - stateOutput.getOutputEvent().setExecutionId(incomingEvent.getExecutionId()); - stateOutput.getOutputEvent().setExecutionProperties(incomingEvent.getExecutionProperties()); - } + stateOutput.getOutputEvents().values().forEach(outputEvent -> { + outputEvent.setExecutionId(incomingEvent.getExecutionId()); + outputEvent.setExecutionProperties(incomingEvent.getExecutionProperties()); + }); // That's it, the state execution is complete return stateOutput; diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateMachineExecutor.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateMachineExecutor.java index 67a3e6d79..52429a215 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateMachineExecutor.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateMachineExecutor.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +22,8 @@ package org.onap.policy.apex.core.engine.executor; +import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.TreeMap; @@ -41,7 +44,7 @@ import org.onap.policy.apex.model.policymodel.concepts.AxStateOutput; * @author Sven van der Meer (sven.van.der.meer@ericsson.com) * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy, ApexInternalContext> { +public class StateMachineExecutor implements Executor<EnEvent, Collection<EnEvent>, AxPolicy, ApexInternalContext> { // The Apex Policy and context for this state machine private AxPolicy axPolicy = null; private Executor<?, ?, ?, ?> parent = null; @@ -54,7 +57,7 @@ public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy private StateExecutor firstExecutor = null; // The next state machine executor - private Executor<EnEvent, EnEvent, AxPolicy, ApexInternalContext> nextExecutor = null; + private Executor<EnEvent, Collection<EnEvent>, AxPolicy, ApexInternalContext> nextExecutor = null; // The executor factory private ExecutorFactory executorFactory = null; @@ -120,8 +123,8 @@ public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy * {@inheritDoc}. */ @Override - public EnEvent execute(final long executionId, final Properties executionProperties, final EnEvent incomingEvent) - throws StateMachineException, ContextException { + public Collection<EnEvent> execute(final long executionId, final Properties executionProperties, + final EnEvent incomingEvent) throws StateMachineException, ContextException { // Check if there are any states on the state machine if (stateExecutorMap.size() == 0) { throw new StateMachineException("no states defined on state machine"); @@ -139,8 +142,10 @@ public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy incomingEvent.getKey(), firstExecutor.getSubject().getKey()), incomingEvent); while (true) { - // Execute the state, it returns an output or throws an exception - stateOutput = stateExecutor.execute(executionId, executionProperties, stateOutput.getOutputEvent()); + // OutputEventSet in a stateoutput can contain multiple events only when it is of the final state + // otherwise, there can be only 1 item in outputEventSet + stateOutput = stateExecutor.execute(executionId, executionProperties, + stateOutput.getOutputEvents().values().iterator().next()); // Use the next state of the state output to find if all the states have executed if (stateOutput.getNextState().equals(AxReferenceKey.getNullKey())) { @@ -155,7 +160,7 @@ public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy } } - return stateOutput.getOutputEvent(); + return stateOutput.getOutputEvents().values(); } /** @@ -229,15 +234,16 @@ public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy * {@inheritDoc}. */ @Override - public final EnEvent getOutgoing() { - return null; + public final Collection<EnEvent> getOutgoing() { + return Collections.emptyList(); } /** * {@inheritDoc}. */ @Override - public final void setNext(final Executor<EnEvent, EnEvent, AxPolicy, ApexInternalContext> newNextExecutor) { + public final void setNext( + final Executor<EnEvent, Collection<EnEvent>, AxPolicy, ApexInternalContext> newNextExecutor) { this.nextExecutor = newNextExecutor; } @@ -245,7 +251,7 @@ public class StateMachineExecutor implements Executor<EnEvent, EnEvent, AxPolicy * {@inheritDoc}. */ @Override - public final Executor<EnEvent, EnEvent, AxPolicy, ApexInternalContext> getNext() { + public final Executor<EnEvent, Collection<EnEvent>, AxPolicy, ApexInternalContext> getNext() { return nextExecutor; } diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java index 9c3c6f893..535565415 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,8 +24,15 @@ package org.onap.policy.apex.core.engine.executor; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; +import lombok.Getter; import org.onap.policy.apex.core.engine.event.EnEvent; import org.onap.policy.apex.core.engine.executor.exception.StateMachineException; +import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; import org.onap.policy.apex.model.basicmodel.concepts.AxReferenceKey; import org.onap.policy.apex.model.basicmodel.service.ModelService; import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; @@ -38,11 +46,12 @@ import org.onap.policy.common.utils.validation.Assertions; * * @author Liam Fallon (liam.fallon@ericsson.com) */ +@Getter public class StateOutput { // The state output has a state and an event private final AxStateOutput stateOutputDefinition; - private final AxEvent outputEventDef; - private final EnEvent outputEvent; + private AxEvent outputEventDef; + private final Map<AxArtifactKey, EnEvent> outputEvents; /** * Create a new state output from a state output definition. @@ -50,7 +59,7 @@ public class StateOutput { * @param axStateOutput the state output definition */ public StateOutput(final AxStateOutput axStateOutput) { - this(axStateOutput, new EnEvent(axStateOutput.getOutgingEvent())); + this(axStateOutput, new EnEvent(axStateOutput.getOutgoingEvent())); } /** @@ -64,8 +73,15 @@ public class StateOutput { Assertions.argumentNotNull(outputEvent, "outputEvent may not be null"); this.stateOutputDefinition = stateOutputDefinition; - this.outputEvent = outputEvent; - outputEventDef = ModelService.getModel(AxEvents.class).get(stateOutputDefinition.getOutgingEvent()); + this.outputEvents = new TreeMap<>(); + if (stateOutputDefinition.getOutgoingEventSet() != null + && !stateOutputDefinition.getOutgoingEventSet().isEmpty()) { + stateOutputDefinition.getOutgoingEventSet() + .forEach(outEvent -> outputEvents.put(outEvent, new EnEvent(outEvent))); + } else { + outputEvents.put(outputEvent.getKey(), outputEvent); + } + outputEventDef = ModelService.getModel(AxEvents.class).get(stateOutputDefinition.getOutgoingEvent()); } /** @@ -78,52 +94,89 @@ public class StateOutput { } /** - * Gets the state output definition. - * - * @return the state output definition - */ - public AxStateOutput getStateOutputDefinition() { - return stateOutputDefinition; - } - - /** - * Gets the output event. - * - * @return the output event - */ - public EnEvent getOutputEvent() { - return outputEvent; - } - - /** * Transfer the fields from the incoming field map into the event. * - * @param incomingFieldDefinitionMap definitions of the incoming fields - * @param eventFieldMap the event field map + * @param incomingEventDefinitionMap definitions of the incoming fields + * @param eventFieldMaps the event field map * @throws StateMachineException on errors populating the event fields */ - public void setEventFields(final Map<String, AxField> incomingFieldDefinitionMap, - final Map<String, Object> eventFieldMap) throws StateMachineException { - Assertions.argumentNotNull(incomingFieldDefinitionMap, "incomingFieldDefinitionMap may not be null"); - Assertions.argumentNotNull(eventFieldMap, "eventFieldMap may not be null"); - - if (!incomingFieldDefinitionMap.keySet().equals(eventFieldMap.keySet())) { - throw new StateMachineException( - "field definitions and values do not match for event " + outputEventDef.getId() + '\n' - + incomingFieldDefinitionMap.keySet() + '\n' + eventFieldMap.keySet()); + public void setEventFields(final Map<String, AxEvent> incomingEventDefinitionMap, + final Map<String, Map<String, Object>> eventFieldMaps) throws StateMachineException { + Assertions.argumentNotNull(incomingEventDefinitionMap, "incomingFieldDefinitionMap may not be null"); + Assertions.argumentNotNull(eventFieldMaps, "eventFieldMaps may not be null"); + + for (Entry<String, AxEvent> incomingEventDefinitionEntry : incomingEventDefinitionMap.entrySet()) { + String eventName = incomingEventDefinitionEntry.getKey(); + AxEvent eventDef = incomingEventDefinitionEntry.getValue(); + if (!eventDef.getParameterMap().keySet().equals(eventFieldMaps.get(eventName).keySet())) { + throw new StateMachineException( + "field definitions and values do not match for event " + eventDef.getId() + '\n' + + eventDef.getParameterMap().keySet() + '\n' + eventFieldMaps.get(eventName).keySet()); + } + } + var updateOnceFlag = false; + if (!outputEvents.keySet().stream().map(AxArtifactKey::getName).collect(Collectors.toSet()) + .equals(eventFieldMaps.keySet())) { + // when same task is used by multiple policies with different eventName but same fields, + // state outputs and task output events may be different + // in this case, update the output fields in the state output only once to avoid overwriting. + updateOnceFlag = true; } - for (final Entry<String, Object> incomingFieldEntry : eventFieldMap.entrySet()) { - final String fieldName = incomingFieldEntry.getKey(); - final AxField fieldDef = incomingFieldDefinitionMap.get(fieldName); + for (Entry<String, Map<String, Object>> eventFieldMapEntry : eventFieldMaps.entrySet()) { + String eventName = eventFieldMapEntry.getKey(); + Map<String, Object> outputEventFields = eventFieldMapEntry.getValue(); + AxEvent taskOutputEvent = incomingEventDefinitionMap.get(eventName); + EnEvent outputEventToUpdate = outputEvents.get(taskOutputEvent.getKey()); + + if (null == outputEventToUpdate) { + // happens only when same task is used by multiple policies with different eventName but same fields + // in this case, just match the fields and get the event in the stateOutput + Set<String> outputEventFieldNames = outputEventFields.keySet(); + Optional<EnEvent> outputEventOpt = outputEvents.values().stream().filter(outputEvent -> outputEvent + .getAxEvent().getParameterMap().keySet().equals(outputEventFieldNames)).findFirst(); + if (outputEventOpt.isEmpty()) { + throw new StateMachineException( + "Task output event field definition and state output event field doesn't match"); + } else { + outputEventToUpdate = outputEventOpt.get(); + } + } + updateOutputEventFields(taskOutputEvent, outputEventFields, outputEventToUpdate); + if (updateOnceFlag) { + break; + } + } + } + private void updateOutputEventFields(AxEvent taskOutputEvent, Map<String, Object> outputEventFields, + EnEvent outputEventToUpdate) throws StateMachineException { + for (Entry<String, Object> outputEventFieldEntry : outputEventFields.entrySet()) { + String fieldName = outputEventFieldEntry.getKey(); + Object fieldValue = outputEventFieldEntry.getValue(); + final AxField fieldDef = taskOutputEvent.getParameterMap().get(fieldName); + + Set<AxArtifactKey> outgoingEventSet = new TreeSet<>(); + if (null == stateOutputDefinition.getOutgoingEventSet() + || stateOutputDefinition.getOutgoingEventSet().isEmpty()) { + // if current state is not the final state, then the set could be empty. + // Just take the outgoingEvent field in this case + outgoingEventSet.add(stateOutputDefinition.getOutgoingEvent()); + } else { + outgoingEventSet.addAll(stateOutputDefinition.getOutgoingEventSet()); + } // Check if this field is a field in the event - if (!outputEventDef.getFields().contains(fieldDef)) { - throw new StateMachineException("field \"" + fieldName + "\" does not exist on event \"" - + outputEventDef.getId() + "\""); + for (AxArtifactKey outputEventKey : outgoingEventSet) { + if (outputEventKey.equals(taskOutputEvent.getKey())) { + outputEventDef = ModelService.getModel(AxEvents.class).get(outputEventKey); + // Check if this field is a field in the state output event + if (!outputEventDef.getFields().contains(fieldDef)) { + throw new StateMachineException( + "field \"" + fieldName + "\" does not exist on event \"" + outputEventDef.getId() + "\""); + } + } } - - // Set the value in the output event - outputEvent.put(fieldName, incomingFieldEntry.getValue()); + // Set the value in the correct output event + outputEventToUpdate.put(fieldName, fieldValue); } } @@ -135,25 +188,30 @@ public class StateOutput { */ public void copyUnsetFields(final EnEvent incomingEvent) { Assertions.argumentNotNull(incomingEvent, "incomingEvent may not be null"); - - for (final Entry<String, Object> incomingField : incomingEvent.entrySet()) { - final String fieldName = incomingField.getKey(); - - // Check if the field exists on the outgoing event - if ((!outputEventDef.getParameterMap().containsKey(fieldName)) - - // Check if the field is set on the outgoing event - || (outputEvent.containsKey(fieldName)) - - // Now, check the fields have the same type - || (!incomingEvent.getAxEvent().getParameterMap().get(fieldName) - .equals(outputEvent.getAxEvent().getParameterMap().get(fieldName)))) { - continue; - } - - // All checks done, we can copy the value - outputEvent.put(fieldName, incomingField.getValue()); + Set<AxArtifactKey> outgoingEventSet = new TreeSet<>(); + if (null == stateOutputDefinition.getOutgoingEventSet() + || stateOutputDefinition.getOutgoingEventSet().isEmpty()) { + // if current state is not the final state, then the set could be empty. + // Just take the outgoingEvent field in this case + outgoingEventSet.add(stateOutputDefinition.getOutgoingEvent()); + } else { + outgoingEventSet.addAll(stateOutputDefinition.getOutgoingEventSet()); } - + incomingEvent.forEach((inFieldName, inFieldValue) -> { + for (AxArtifactKey outputEventKey : outgoingEventSet) { + outputEventDef = ModelService.getModel(AxEvents.class).get(outputEventKey); + // Check if the field exists on the outgoing event + if (!outputEventDef.getParameterMap().containsKey(inFieldName) + // Check if the field is set in the outgoing event + || outputEvents.get(outputEventKey).containsKey(inFieldName) + // Now, check the fields have the same type + || !incomingEvent.getAxEvent().getParameterMap().get(inFieldName) + .equals(outputEvents.get(outputEventKey).getAxEvent().getParameterMap().get(inFieldName))) { + continue; + } + // All checks done, we can copy the value + outputEvents.get(outputEventKey).put(inFieldName, inFieldValue); + } + }); } } diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/TaskExecutor.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/TaskExecutor.java index 0150b65b2..31e27d244 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/TaskExecutor.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/TaskExecutor.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019-2020 Nordix Foundation. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +25,6 @@ package org.onap.policy.apex.core.engine.executor; import static org.onap.policy.common.utils.validation.Assertions.argumentOfClassNotNull; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -41,8 +41,7 @@ import org.onap.policy.apex.core.engine.executor.context.TaskExecutionContext; import org.onap.policy.apex.core.engine.executor.exception.StateMachineException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; import org.onap.policy.apex.model.basicmodel.concepts.AxReferenceKey; -import org.onap.policy.apex.model.eventmodel.concepts.AxInputField; -import org.onap.policy.apex.model.eventmodel.concepts.AxOutputField; +import org.onap.policy.apex.model.eventmodel.concepts.AxField; import org.onap.policy.apex.model.policymodel.concepts.AxTask; import org.onap.policy.apex.model.policymodel.concepts.AxTaskParameter; import org.slf4j.ext.XLogger; @@ -56,7 +55,7 @@ import org.slf4j.ext.XLoggerFactory; * @author Liam Fallon (liam.fallon@ericsson.com) */ public abstract class TaskExecutor - implements Executor<Map<String, Object>, Map<String, Object>, AxTask, ApexInternalContext> { + implements Executor<Map<String, Object>, Map<String, Map<String, Object>>, AxTask, ApexInternalContext> { // Logger for this class private static final XLogger LOGGER = XLoggerFactory.getXLogger(TaskExecutor.class); @@ -67,10 +66,11 @@ public abstract class TaskExecutor // Holds the incoming and outgoing fields private Map<String, Object> incomingFields = null; - private Map<String, Object> outgoingFields = null; + private Map<String, Map<String, Object>> outgoingFieldsMap = null; // The next task executor - private Executor<Map<String, Object>, Map<String, Object>, AxTask, ApexInternalContext> nextExecutor = null; + private Executor<Map<String, Object>, Map<String, Map<String, Object>>, AxTask, ApexInternalContext> nextExecutor = + null; // The task execution context; contains the facades for events and context to be used by tasks // executed by this task @@ -104,7 +104,7 @@ public abstract class TaskExecutor * {@inheritDoc}. */ @Override - public Map<String, Object> execute(final long executionId, final Properties executionProperties, + public Map<String, Map<String, Object>> execute(final long executionId, final Properties executionProperties, final Map<String, Object> newIncomingFields) throws StateMachineException, ContextException { throw new StateMachineException( "execute() not implemented on abstract TaskExecutor class, only on its subclasses"); @@ -120,19 +120,13 @@ public abstract class TaskExecutor + getSubject().getKey().getId() + "," + getSubject().getTaskLogic().getLogic()); // Check that the incoming event has all the input fields for this state - final Set<String> missingTaskInputFields = new TreeSet<>(axTask.getInputFields().keySet()); + Map<String, AxField> inputEventParameterMap = axTask.getInputEvent().getParameterMap(); + final Set<String> missingTaskInputFields = new TreeSet<>(inputEventParameterMap.keySet()); missingTaskInputFields.removeAll(newIncomingFields.keySet()); // Remove fields from the set that are optional - final Set<String> optionalFields = new TreeSet<>(); - for (final Iterator<String> missingFieldIterator = missingTaskInputFields.iterator(); missingFieldIterator - .hasNext();) { - final String missingField = missingFieldIterator.next(); - if (axTask.getInputFields().get(missingField).getOptional()) { - optionalFields.add(missingField); - } - } - missingTaskInputFields.removeAll(optionalFields); + missingTaskInputFields.removeIf(missingField -> inputEventParameterMap.get(missingField).getOptional()); + if (!missingTaskInputFields.isEmpty()) { throw new StateMachineException("task input fields \"" + missingTaskInputFields + "\" are missing for task \"" + axTask.getKey().getId() + "\""); @@ -142,14 +136,15 @@ public abstract class TaskExecutor this.incomingFields = newIncomingFields; // Initiate the outgoing fields - outgoingFields = new TreeMap<>(); - for (final String outputFieldName : getSubject().getOutputFields().keySet()) { - outgoingFields.put(outputFieldName, null); + outgoingFieldsMap = new TreeMap<>(); + for (var outputEventEntry: axTask.getOutputEvents().entrySet()) { + Map<String, Object> outgoingFields = new TreeMap<>(); + outputEventEntry.getValue().getParameterMap().keySet().forEach(field -> outgoingFields.put(field, null)); + outgoingFieldsMap.put(outputEventEntry.getKey(), outgoingFields); } - // Get task context object executionContext = new TaskExecutionContext(this, executionId, executionProperties, getSubject(), getIncoming(), - getOutgoing(), getContext()); + outgoingFieldsMap.values(), getContext()); } /** @@ -167,71 +162,77 @@ public abstract class TaskExecutor throw new StateMachineException(errorMessage); } - // Copy any unset fields from the input to the output if their data type and names are - // identical - for (final String field : axTask.getOutputFields().keySet()) { - copyInputField2Output(field); - } + // Copy any unset fields from the input to the output if their data type and names are identical + axTask.getOutputEvents().entrySet().forEach(outputEventEntry -> outputEventEntry.getValue().getParameterMap() + .keySet().forEach(field -> copyInputField2Output(outputEventEntry.getKey(), field))); // Finally, check that the outgoing fields have all the output fields defined for this state - // and, if not, output - // a list of missing fields - final Set<String> missingTaskOutputFields = new TreeSet<>(axTask.getOutputFields().keySet()); - missingTaskOutputFields.removeAll(outgoingFields.keySet()); + // and, if not, output a list of missing fields + Map<String, Set<String>> missingTaskOutputFieldsMap = new TreeMap<>(); + axTask.getOutputEvents().entrySet().forEach(outputEventEntry -> { + Set<String> missingTaskOutputFields = new TreeSet<>(); + missingTaskOutputFields.addAll(outputEventEntry.getValue().getParameterMap().keySet()); + String key = outputEventEntry.getKey(); + missingTaskOutputFields.removeAll(outgoingFieldsMap.get(key).keySet()); + missingTaskOutputFieldsMap.put(key, missingTaskOutputFields); + }); // Remove fields from the set that are optional - final Set<String> optionalOrCopiedFields = new TreeSet<>(); - for (final Iterator<String> missingFieldIterator = missingTaskOutputFields.iterator(); missingFieldIterator - .hasNext();) { - final String missingField = missingFieldIterator.next(); - if (axTask.getInputFields().containsKey(missingField) - || axTask.getOutputFields().get(missingField).getOptional()) { - optionalOrCopiedFields.add(missingField); - } - } - missingTaskOutputFields.removeAll(optionalOrCopiedFields); - if (!missingTaskOutputFields.isEmpty()) { - throw new StateMachineException("task output fields \"" + missingTaskOutputFields - + "\" are missing for task \"" + axTask.getKey().getId() + "\""); + missingTaskOutputFieldsMap.entrySet() + .forEach(missingTaskOutputFieldsEntry -> missingTaskOutputFieldsEntry.getValue() + .removeIf(missingField -> axTask.getInputEvent().getParameterMap().containsKey(missingField) + || axTask.getOutputEvents().get(missingTaskOutputFieldsEntry.getKey()).getParameterMap() + .get(missingField).getOptional())); + missingTaskOutputFieldsMap.entrySet() + .removeIf(missingTaskOutputFieldsEntry -> missingTaskOutputFieldsEntry.getValue().isEmpty()); + if (!missingTaskOutputFieldsMap.isEmpty()) { + throw new StateMachineException("Fields for task output events \"" + missingTaskOutputFieldsMap.keySet() + + "\" are missing for task \"" + axTask.getKey().getId() + "\""); + } // Finally, check that the outgoing field map don't have any extra fields, if present, raise - // exception with the - // list of extra fields - final Set<String> extraTaskOutputFields = new TreeSet<>(outgoingFields.keySet()); - extraTaskOutputFields.removeAll(axTask.getOutputFields().keySet()); - if (!extraTaskOutputFields.isEmpty()) { - throw new StateMachineException("task output fields \"" + extraTaskOutputFields - + "\" are unwanted for task \"" + axTask.getKey().getId() + "\""); + // exception with the list of extra fields + final Map<String, Set<String>> extraTaskOutputFieldsMap = new TreeMap<>(); + outgoingFieldsMap.entrySet().forEach(outgoingFieldsEntry -> extraTaskOutputFieldsMap + .put(outgoingFieldsEntry.getKey(), new TreeSet<>(outgoingFieldsEntry.getValue().keySet()))); + extraTaskOutputFieldsMap.entrySet().forEach(extraTaskOutputFieldsEntry -> extraTaskOutputFieldsEntry.getValue() + .removeAll(axTask.getOutputEvents().get(extraTaskOutputFieldsEntry.getKey()).getParameterMap().keySet())); + extraTaskOutputFieldsMap.entrySet() + .removeIf(extraTaskOutputFieldsEntry -> extraTaskOutputFieldsEntry.getValue().isEmpty()); + if (!extraTaskOutputFieldsMap.isEmpty()) { + throw new StateMachineException("task output event \"" + extraTaskOutputFieldsMap.keySet() + + "\" contains fields that are unwanted for task \"" + axTask.getKey().getId() + "\""); } - String message = "execute-post:" + axTask.getKey().getId() + ", returning fields " + outgoingFields.toString(); + String message = + "execute-post:" + axTask.getKey().getId() + ", returning fields " + outgoingFieldsMap.toString(); LOGGER.debug(message); } /** * If the input field exists on the output and it is not set in the task, then it should be copied to the output. * + * @param eventName the event name * @param field the input field */ - private void copyInputField2Output(String field) { + private void copyInputField2Output(String eventName, String field) { + Map<String, Object> outgoingFields = outgoingFieldsMap.get(eventName); // Check if the field exists and is not set on the output - if (getOutgoing().containsKey(field) && getOutgoing().get(field) != null) { + if (outgoingFields.get(field) != null) { return; } // This field is not in the output, check if it's on the input and is the same type - // (Note here, the output - // field definition has to exist so it's not - // null checked) - final AxInputField inputFieldDef = axTask.getInputFields().get(field); - final AxOutputField outputFieldDef = axTask.getOutputFields().get(field); + // (Note here, the output field definition has to exist so it's not null checked) + final AxField inputFieldDef = axTask.getInputEvent().getParameterMap().get(field); + final AxField outputFieldDef = axTask.getOutputEvents().get(eventName).getParameterMap().get(field); if (inputFieldDef == null || !inputFieldDef.getSchema().equals(outputFieldDef.getSchema())) { return; } // We have an input field that matches our output field, copy the value across - getOutgoing().put(field, getIncoming().get(field)); + outgoingFields.put(field, getIncoming().get(field)); } /** @@ -308,15 +309,16 @@ public abstract class TaskExecutor * {@inheritDoc}. */ @Override - public Map<String, Object> getOutgoing() { - return outgoingFields; + public Map<String, Map<String, Object>> getOutgoing() { + return outgoingFieldsMap; } /** * {@inheritDoc}. */ @Override - public void setNext(final Executor<Map<String, Object>, Map<String, Object>, AxTask, ApexInternalContext> nextEx) { + public void setNext( + final Executor<Map<String, Object>, Map<String, Map<String, Object>>, AxTask, ApexInternalContext> nextEx) { this.nextExecutor = nextEx; } @@ -324,7 +326,7 @@ public abstract class TaskExecutor * {@inheritDoc}. */ @Override - public Executor<Map<String, Object>, Map<String, Object>, AxTask, ApexInternalContext> getNext() { + public Executor<Map<String, Object>, Map<String, Map<String, Object>>, AxTask, ApexInternalContext> getNext() { return nextExecutor; } diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java index f3906213f..5f39bcd56 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -84,18 +85,34 @@ public class AxTaskFacade { */ public SchemaHelper getInFieldSchemaHelper(final String fieldName) { // Find the field for the field name - return getFieldSchemaHelper(fieldName, task.getInputFields().get(fieldName), "incoming"); + return getFieldSchemaHelper(fieldName, task.getInputEvent().getParameterMap().get(fieldName), "incoming"); } /** * Creates a schema helper for an outgoing field of this task. + * This method can be used only when there is a single event output as part of a task * * @param fieldName The name of the field to get a schema helper for * @return the schema helper for this field */ public SchemaHelper getOutFieldSchemaHelper(final String fieldName) { // Find the field for the field name - return getFieldSchemaHelper(fieldName, task.getOutputFields().get(fieldName), "outgoing"); + return getFieldSchemaHelper(fieldName, + task.getOutputEvents().values().iterator().next().getParameterMap().get(fieldName), "outgoing"); + } + + /** + * Creates a schema helper for an outgoing field of this task. + * This method can be used when there are multiple event outputs from a task + * + * @param eventName the name of the event to which the field belongs to + * @param fieldName The name of the field to get a schema helper for + * @return the schema helper for this field + */ + public SchemaHelper getOutFieldSchemaHelper(final String eventName, final String fieldName) { + // Find the field for the field name + return getFieldSchemaHelper(fieldName, task.getOutputEvents().get(eventName).getParameterMap().get(fieldName), + "outgoing"); } /** diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/TaskExecutionContext.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/TaskExecutionContext.java index 49459dfae..a54252e05 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/TaskExecutionContext.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/TaskExecutionContext.java @@ -3,6 +3,7 @@ * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2020 Nordix Foundation. * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +24,7 @@ package org.onap.policy.apex.core.engine.executor.context; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -73,6 +75,12 @@ public class TaskExecutionContext extends AbstractExecutionContext { public final Map<String, Object> outFields; /** + * The outgoing fields from the task. The task logic can access and set these fields with its logic. A task outputs + * its result using these fields. + */ + public final Collection<Map<String, Object>> outFieldsList; + + /** * Logger for task execution, task logic can use this field to access and log to Apex logging. */ public final XLogger logger = EXECUTION_LOGGER; @@ -97,12 +105,12 @@ public class TaskExecutionContext extends AbstractExecutionContext { * @param executionProperties the execution properties for task execution * @param axTask the task definition that is the subject of execution * @param inFields the in fields - * @param outFields the out fields + * @param outFieldsList collection of the out fields * @param internalContext the execution context of the Apex engine in which the task is being executed */ public TaskExecutionContext(final TaskExecutor taskExecutor, final long executionId, final Properties executionProperties, final AxTask axTask, final Map<String, Object> inFields, - final Map<String, Object> outFields, final ApexInternalContext internalContext) { + final Collection<Map<String, Object>> outFieldsList, final ApexInternalContext internalContext) { super(executionId, executionProperties); // The subject is the task definition @@ -113,7 +121,13 @@ public class TaskExecutionContext extends AbstractExecutionContext { // The input and output fields this.inFields = Collections.unmodifiableMap(inFields); - this.outFields = outFields; + this.outFieldsList = outFieldsList; + // if only a single output event needs to fired from a task, the outFields alone can be used too + if (outFieldsList.isEmpty()) { + this.outFields = new TreeMap<>(); + } else { + this.outFields = outFieldsList.iterator().next(); + } // Set up the context albums for this task context = new TreeMap<>(); @@ -156,7 +170,7 @@ public class TaskExecutionContext extends AbstractExecutionContext { */ public ContextAlbum getContextAlbum(final String contextAlbumName) { // Find the context album - final ContextAlbum foundContextAlbum = context.get(contextAlbumName); + final var foundContextAlbum = context.get(contextAlbumName); // Check if the context album exists if (foundContextAlbum != null) { @@ -166,4 +180,16 @@ public class TaskExecutionContext extends AbstractExecutionContext { + "\" on task \"" + subject.getId() + "\""); } } + + /** + * Method to add fields to the output event list. + * @param fields the fields to be added + */ + public void addFieldsToOutput(Map<String, Object> fields) { + for (Map<String, Object> outputFields : outFieldsList) { + if (outputFields.keySet().containsAll(fields.keySet())) { + outputFields.replaceAll((name, value) -> fields.get(name)); + } + } + } } |