aboutsummaryrefslogtreecommitdiffstats
path: root/core/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java')
-rw-r--r--core/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java532
1 files changed, 532 insertions, 0 deletions
diff --git a/core/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java b/core/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java
new file mode 100644
index 000000000..35139bfe4
--- /dev/null
+++ b/core/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java
@@ -0,0 +1,532 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * Modifications Copyright (C) 2019-2020 Nordix Foundation.
+ * Modifications Copyright (C) 2021-2022 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.engine.engine.impl;
+
+import static org.onap.policy.common.utils.validation.Assertions.argumentNotNull;
+
+import io.prometheus.client.Gauge;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import lombok.Getter;
+import org.onap.policy.apex.context.ContextAlbum;
+import org.onap.policy.apex.context.ContextException;
+import org.onap.policy.apex.core.engine.context.ApexInternalContext;
+import org.onap.policy.apex.core.engine.engine.ApexEngine;
+import org.onap.policy.apex.core.engine.engine.EnEventListener;
+import org.onap.policy.apex.core.engine.event.EnEvent;
+import org.onap.policy.apex.core.engine.executor.exception.StateMachineException;
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException;
+import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
+import org.onap.policy.apex.model.basicmodel.concepts.AxReferenceKey;
+import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
+import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
+import org.onap.policy.apex.model.enginemodel.concepts.AxEngineStats;
+import org.onap.policy.apex.model.eventmodel.concepts.AxEvent;
+import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
+import org.onap.policy.apex.model.policymodel.concepts.AxState;
+import org.onap.policy.apex.model.policymodel.concepts.AxStateOutput;
+import org.onap.policy.apex.model.policymodel.concepts.AxStateTaskOutputType;
+import org.onap.policy.apex.model.policymodel.concepts.AxStateTaskReference;
+import org.onap.policy.apex.model.policymodel.concepts.AxTask;
+import org.onap.policy.common.utils.resources.PrometheusUtils;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This class controls the thread of execution of a single engine in an Apex system. An engine is a single thread in a
+ * pool of engines that are running a set of policies. An engine is either inactive, waiting for a policy to be
+ * triggered or executing a policy. The engine runs off a queue of triggers that trigger its state machine. If the queue
+ * is empty, it waits for the next trigger. The Apex engine holds its state machine in a {@link StateMachineHandler}
+ * instance and uses its state machine handler to execute events.
+ *
+ * @author Liam Fallon
+ */
+public class ApexEngineImpl implements ApexEngine {
+
+ // Logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEngineImpl.class);
+
+ // Register state changes with prometheus
+ static final Gauge ENGINE_STATE = Gauge.build().namespace(PrometheusUtils.PdpType.PDPA.getNamespace())
+ .name("engine_state").labelNames("engine_instance_id")
+ .help("State of the APEX engine as integers mapped as - 0:UNDEFINED, 1:STOPPED, 2:READY,"
+ + " 3:EXECUTING, 4:STOPPING").register();
+
+ // Recurring string constants
+ private static final String UPDATE_MODEL = "updateModel()<-";
+ private static final String START = "start()<-";
+ private static final String STOP = "stop()<-";
+
+ // The artifact key of this engine
+ @Getter
+ private final AxArtifactKey key;
+
+ // The state of this engine
+ @Getter
+ private AxEngineState state = AxEngineState.STOPPED;
+ private final Object stateLockObj = new Object();
+
+ // call back listeners
+ private final Map<String, EnEventListener> eventListeners = new LinkedHashMap<>();
+
+ // The context of this engine
+ @Getter
+ private ApexInternalContext internalContext = null;
+
+ // The state machines
+ private StateMachineHandler stateMachineHandler = null;
+
+ // Statistics on engine execution
+ private final AxEngineStats engineStats;
+
+ /**
+ * Constructor, instantiate the engine with its state machine table.
+ *
+ * @param key the key of the engine
+ */
+ protected ApexEngineImpl(final AxArtifactKey key) {
+ argumentNotNull(key, "AxArtifactKey may not be null");
+
+ LOGGER.entry("ApexEngine()->{}, {}", key.getId(), state);
+
+ this.key = key;
+
+ // Set up statistics collection
+ engineStats = new AxEngineStats();
+ engineStats.setKey(new AxReferenceKey(key, "_EngineStats"));
+
+ LOGGER.exit("ApexEngine()<-" + key.getId() + "," + state);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void updateModel(final AxPolicyModel apexModel, final boolean isSubsequentInstance) throws ApexException {
+ updateStatePrometheusMetric();
+ if (apexModel != null) {
+ LOGGER.entry("updateModel()->{}, apexPolicyModel {}", key.getId(), apexModel.getKey().getId());
+ } else {
+ throw new ApexException(UPDATE_MODEL + key.getId() + ", Apex model is not defined, it has a null value");
+ }
+
+ // The engine must be stopped in order to do a model update
+ synchronized (stateLockObj) {
+ if (!state.equals(AxEngineState.STOPPED)) {
+ throw new ApexException(
+ UPDATE_MODEL + key.getId() + ", cannot update model, engine should be stopped but is in state "
+ + state);
+ }
+ }
+
+ populateIoEventsToTask(apexModel);
+
+ // Create new internal context or update the existing one
+ try {
+ if (internalContext == null) {
+ /// New internal context
+ internalContext = new ApexInternalContext(apexModel);
+ } else {
+ // Existing internal context which must be updated
+ internalContext.update(apexModel, isSubsequentInstance);
+ }
+ } catch (final ContextException e) {
+ throw new ApexException(
+ UPDATE_MODEL + key.getId() + ", error setting the context for engine \"" + key.getId() + "\"", e);
+ }
+
+ // Set up the state machines
+ try {
+ // We always set up state machines as new because it's only context that must be transferred; policies are
+ // always set up as new
+ stateMachineHandler = new StateMachineHandler(internalContext);
+ } catch (final StateMachineException e) {
+ throw new ApexException(
+ UPDATE_MODEL + key.getId() + ", error setting up the engine state machines \"" + key.getId() + "\"", e);
+ }
+
+ LOGGER.exit(UPDATE_MODEL + key.getId());
+ }
+
+
+ private void populateIoEventsToTask(AxPolicyModel apexPolicyModel) {
+ Set<AxArtifactKey> updatedTasks = new TreeSet<>();
+ for (var axPolicy : apexPolicyModel.getPolicies().getPolicyMap().values()) {
+ for (var axState : axPolicy.getStateMap().values()) {
+ AxEvent triggerEvent = apexPolicyModel.getEvents().get(axState.getTrigger());
+ axState.getTaskReferences().forEach((taskKey, taskRef) -> {
+ AxTask task = apexPolicyModel.getTasks().getTaskMap().get(taskKey);
+ task.setInputEvent(triggerEvent);
+ updateTaskBasedOnStateOutput(apexPolicyModel, updatedTasks, axState, taskKey, taskRef, task);
+ updatedTasks.add(taskKey);
+ });
+ }
+ }
+ }
+
+ private void updateTaskBasedOnStateOutput(AxPolicyModel apexPolicyModel, Set<AxArtifactKey> updatedTasks,
+ AxState state, AxArtifactKey taskKey, AxStateTaskReference taskRef, AxTask task) {
+ Map<String, AxEvent> outputEvents = new TreeMap<>();
+ AxStateOutput stateOutput = null;
+ if (AxStateTaskOutputType.LOGIC.equals(taskRef.getStateTaskOutputType())) {
+ // in case of SFL, outgoing event will be same for all state outputs that are part of SFL.So, take any entry
+ stateOutput = state.getStateOutputs().values().iterator().next();
+ } else {
+ stateOutput = state.getStateOutputs().get(taskRef.getOutput().getLocalName());
+ }
+ if (null != stateOutput) {
+ if (null == stateOutput.getOutgoingEventSet() || stateOutput.getOutgoingEventSet().isEmpty()) {
+ Set<AxArtifactKey> outEventSet = new TreeSet<>();
+ outEventSet.add(stateOutput.getOutgoingEvent());
+ stateOutput.setOutgoingEventSet(outEventSet);
+ }
+ if (state.getNextStateSet().isEmpty()
+ || state.getNextStateSet().contains(AxReferenceKey.getNullKey().getLocalName())) {
+ stateOutput.getOutgoingEventSet().forEach(outgoingEventKey -> outputEvents
+ .put(outgoingEventKey.getName(), apexPolicyModel.getEvents().get(outgoingEventKey)));
+ } else {
+ AxArtifactKey outgoingEventKey = stateOutput.getOutgoingEvent();
+ outputEvents.put(outgoingEventKey.getName(), apexPolicyModel.getEvents().get(outgoingEventKey));
+ }
+ if (updatedTasks.contains(taskKey)) {
+ // this happens only when same task is used by multiple policies
+ // with different eventName but same fields
+ task.getOutputEvents().putAll(outputEvents);
+ } else {
+ task.setOutputEvents(outputEvents);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void start() throws ApexException {
+ LOGGER.entry("start() {}", key);
+ synchronized (stateLockObj) {
+ if (state != AxEngineState.STOPPED) {
+ String message =
+ START + key.getId() + "," + state + ", cannot start engine, engine not in state STOPPED";
+ throw new ApexException(message);
+ }
+ }
+
+ if (stateMachineHandler == null || internalContext == null) {
+ throw new ApexException(START + key.getId() + "," + state
+ + ", cannot start engine, engine has not been initialized, its model is not loaded");
+ }
+
+ // Set up the state machines
+ try {
+ // Start the state machines
+ stateMachineHandler.start();
+ engineStats.engineStart();
+ } catch (final StateMachineException e) {
+ String message =
+ UPDATE_MODEL + key.getId() + ", error starting the engine state machines \"" + key.getId() + "\"";
+ throw new ApexException(message, e);
+ }
+
+ // OK, we are good to go
+ state = AxEngineState.READY;
+ updateStatePrometheusMetric();
+
+ LOGGER.exit("start()" + key);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void stop() throws ApexException {
+ LOGGER.entry("stop()-> {}", key);
+
+ // Check if the engine is already stopped
+ synchronized (stateLockObj) {
+ if (state == AxEngineState.STOPPED) {
+ throw new ApexException(
+ STOP + key.getId() + "," + state + ", cannot stop engine, engine is already stopped");
+ }
+ }
+ // Stop the engine if it is in state READY, if it is in state EXECUTING, wait for execution to finish
+ for (int increment = ApexEngineConstants.STOP_EXECUTION_WAIT_TIMEOUT; increment > 0;
+ increment -= ApexEngineConstants.APEX_ENGINE_STOP_EXECUTION_WAIT_INCREMENT) {
+ ThreadUtilities.sleep(ApexEngineConstants.APEX_ENGINE_STOP_EXECUTION_WAIT_INCREMENT);
+
+ synchronized (stateLockObj) {
+ switch (state) {
+ // Engine is OK to stop or has been stopped on return of an event
+ case READY:
+ case STOPPED:
+ state = AxEngineState.STOPPED;
+ updateStatePrometheusMetric();
+ stateMachineHandler.stop();
+ engineStats.engineStop();
+ LOGGER.exit("stop()" + key);
+ return;
+
+ // Engine is executing a policy, wait for it to stop
+ case EXECUTING:
+ state = AxEngineState.STOPPING;
+ updateStatePrometheusMetric();
+ break;
+
+ // Wait for the engine to stop
+ case STOPPING:
+ break;
+
+ default:
+ throw new ApexException(
+ STOP + key.getId() + "," + state + ", cannot stop engine, engine is in an undefined state");
+ }
+ }
+ }
+
+ // Force the engine to STOPPED state
+ synchronized (stateLockObj) {
+ state = AxEngineState.STOPPED;
+ }
+ updateStatePrometheusMetric();
+
+ throw new ApexException(STOP + key.getId() + "," + state + ", error stopping engine, engine stop timed out");
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void clear() throws ApexException {
+ LOGGER.entry("clear()-> {}", key);
+ synchronized (stateLockObj) {
+ if (state != AxEngineState.STOPPED) {
+ throw new ApexException(
+ "clear" + "()<-" + key.getId() + "," + state + ", cannot clear engine, engine is not stopped");
+ }
+ }
+
+ // Clear everything
+ stateMachineHandler = null;
+ engineStats.clean();
+
+ if (internalContext != null) {
+ internalContext.clear();
+ internalContext = null;
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public EnEvent createEvent(final AxArtifactKey eventKey) {
+ synchronized (stateLockObj) {
+ if (state != AxEngineState.READY && state != AxEngineState.EXECUTING) {
+ LOGGER.warn("createEvent()<-{},{}, cannot create event, engine not in state READY", key.getId(), state);
+ return null;
+ }
+ }
+
+ try {
+ // Create an event using the internal context
+ return new EnEvent(eventKey);
+ } catch (final Exception e) {
+ LOGGER.warn("createEvent()<-{},{}, error on event creation: ", key.getId(), state, e);
+ return null;
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public boolean handleEvent(final EnEvent incomingEvent) {
+ var ret = false;
+ if (incomingEvent == null) {
+ LOGGER.warn("handleEvent()<-{},{}, cannot run engine, incoming event is null", key.getId(), state);
+ return ret;
+ }
+
+ synchronized (stateLockObj) {
+ if (state != AxEngineState.READY) {
+ LOGGER.warn("handleEvent()<-{},{}, cannot run engine, engine not in state READY", key.getId(), state);
+ return ret;
+ }
+
+ state = AxEngineState.EXECUTING;
+ }
+ updateStatePrometheusMetric();
+
+ String message = "execute(): triggered by event " + incomingEvent.toString();
+ LOGGER.debug(message);
+
+ // By default we return a null event on errors
+ Collection<EnEvent> outgoingEvents = null;
+ try {
+ engineStats.executionEnter(incomingEvent.getKey());
+ outgoingEvents = stateMachineHandler.execute(incomingEvent);
+ engineStats.executionExit();
+ ret = true;
+ } catch (final StateMachineException e) {
+ LOGGER.warn("handleEvent()<-{},{}, engine execution error: ", key.getId(), state, e);
+
+ // Create an exception return event
+ outgoingEvents = createExceptionEvent(incomingEvent, e);
+ }
+
+ // Publish the outgoing event
+ try {
+ synchronized (eventListeners) {
+ if (eventListeners.isEmpty()) {
+ LOGGER.debug("handleEvent()<-{},{}, There is no listener registered to recieve outgoing event: {}",
+ key.getId(), state, outgoingEvents);
+ }
+ for (final EnEventListener axEventListener : eventListeners.values()) {
+ for (var outgoingEvent : outgoingEvents) {
+ axEventListener.onEnEvent(outgoingEvent);
+ }
+ }
+ }
+ } catch (final ApexException e) {
+ LOGGER.warn("handleEvent()<-{},{}, outgoing event publishing error: ", key.getId(), state, e);
+ ret = false;
+ }
+ synchronized (stateLockObj) {
+ // Only go to READY if we are still in state EXECUTING, we go to state STOPPED if we were STOPPING
+ if (state == AxEngineState.EXECUTING) {
+ state = AxEngineState.READY;
+ } else if (state == AxEngineState.STOPPING) {
+ state = AxEngineState.STOPPED;
+ }
+ }
+ updateStatePrometheusMetric();
+ return ret;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void addEventListener(final String listenerName, final EnEventListener listener) {
+ if (listenerName == null) {
+ String message = "addEventListener()<-" + key.getId() + "," + state + ", listenerName is null";
+ throw new ApexRuntimeException(message);
+ }
+
+ if (listener == null) {
+ String message = "addEventListener()<-" + key.getId() + "," + state + ", listener is null";
+ throw new ApexRuntimeException(message);
+ }
+
+ eventListeners.put(listenerName, listener);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void removeEventListener(final String listenerName) {
+ if (listenerName == null) {
+ String message = "removeEventListener()<-" + key.getId() + "," + state + ", listenerName is null";
+ throw new ApexRuntimeException(message);
+ }
+
+ eventListeners.remove(listenerName);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public AxEngineModel getEngineStatus() {
+ final var engineModel = new AxEngineModel(key);
+ engineModel.setTimestamp(System.currentTimeMillis());
+ engineModel.setState(state);
+ engineModel.setStats(engineStats);
+ return engineModel;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public Map<AxArtifactKey, Map<String, Object>> getEngineContext() {
+ final Map<AxArtifactKey, Map<String, Object>> currentContext = new LinkedHashMap<>();
+
+ if (internalContext == null) {
+ return currentContext;
+ }
+
+ for (final Entry<AxArtifactKey, ContextAlbum> contextAlbumEntry : internalContext.getContextAlbums()
+ .entrySet()) {
+ currentContext.put(contextAlbumEntry.getKey(), contextAlbumEntry.getValue());
+ }
+
+ return currentContext;
+ }
+
+ /**
+ * Create an exception event from the incoming event including the exception information on the event.
+ *
+ * @param incomingEvent The incoming event that caused the exception
+ * @param eventException The exception that was thrown
+ * @return the exception event
+ */
+ private Set<EnEvent> createExceptionEvent(final EnEvent incomingEvent, final Exception eventException) {
+ // The exception event is a clone of the incoming event with the exception suffix added to
+ // its name and an extra
+ // field "ExceptionMessage" added
+ final EnEvent exceptionEvent = (EnEvent) incomingEvent.clone();
+
+ // Create the cascaded message string
+ final var exceptionMessageStringBuilder = new StringBuilder();
+ exceptionMessageStringBuilder.append(eventException.getMessage());
+
+ Throwable subException = eventException.getCause();
+ while (subException != null) {
+ exceptionMessageStringBuilder.append("\ncaused by: ");
+ exceptionMessageStringBuilder.append(subException.getMessage());
+ subException = subException.getCause();
+ }
+
+ // Set the exception message on the event
+ exceptionEvent.setExceptionMessage(exceptionMessageStringBuilder.toString());
+
+ return Set.of(exceptionEvent);
+ }
+
+ /**
+ * Update the APEX engine state to prometheus for monitoring.
+ */
+ private void updateStatePrometheusMetric() {
+ ENGINE_STATE.labels(getKey().getId()).set(state.getStateIdentifier());
+ }
+} \ No newline at end of file