aboutsummaryrefslogtreecommitdiffstats
path: root/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/engine/impl/ApexEngineImpl.java
blob: 35139bfe49a0024333de4ec8e6b27bcf61660e67 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
/*-
 * ============LICENSE_START=======================================================
 *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
 *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
 *  Modifications Copyright (C) 2021-2022 Bell Canada. All rights reserved.
 *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 * ============LICENSE_END=========================================================
 */

package org.onap.policy.apex.core.engine.engine.impl;

import static org.onap.policy.common.utils.validation.Assertions.argumentNotNull;

import io.prometheus.client.Gauge;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import lombok.Getter;
import org.onap.policy.apex.context.ContextAlbum;
import org.onap.policy.apex.context.ContextException;
import org.onap.policy.apex.core.engine.context.ApexInternalContext;
import org.onap.policy.apex.core.engine.engine.ApexEngine;
import org.onap.policy.apex.core.engine.engine.EnEventListener;
import org.onap.policy.apex.core.engine.event.EnEvent;
import org.onap.policy.apex.core.engine.executor.exception.StateMachineException;
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException;
import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
import org.onap.policy.apex.model.basicmodel.concepts.AxReferenceKey;
import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
import org.onap.policy.apex.model.enginemodel.concepts.AxEngineStats;
import org.onap.policy.apex.model.eventmodel.concepts.AxEvent;
import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
import org.onap.policy.apex.model.policymodel.concepts.AxState;
import org.onap.policy.apex.model.policymodel.concepts.AxStateOutput;
import org.onap.policy.apex.model.policymodel.concepts.AxStateTaskOutputType;
import org.onap.policy.apex.model.policymodel.concepts.AxStateTaskReference;
import org.onap.policy.apex.model.policymodel.concepts.AxTask;
import org.onap.policy.common.utils.resources.PrometheusUtils;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;

/**
 * This class controls the thread of execution of a single engine in an Apex system. An engine is a single thread in a
 * pool of engines that are running a set of policies. An engine is either inactive, waiting for a policy to be
 * triggered or executing a policy. The engine runs off a queue of triggers that trigger its state machine. If the queue
 * is empty, it waits for the next trigger. The Apex engine holds its state machine in a {@link StateMachineHandler}
 * instance and uses its state machine handler to execute events.
 *
 * @author Liam Fallon
 */
public class ApexEngineImpl implements ApexEngine {

    // Logger for this class
    private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEngineImpl.class);

    // Register state changes with prometheus
    static final Gauge ENGINE_STATE = Gauge.build().namespace(PrometheusUtils.PdpType.PDPA.getNamespace())
        .name("engine_state").labelNames("engine_instance_id")
        .help("State of the APEX engine as integers mapped as - 0:UNDEFINED, 1:STOPPED, 2:READY,"
            + " 3:EXECUTING, 4:STOPPING").register();

    // Recurring string constants
    private static final String UPDATE_MODEL = "updateModel()<-";
    private static final String START = "start()<-";
    private static final String STOP = "stop()<-";

    // The artifact key of this engine
    @Getter
    private final AxArtifactKey key;

    // The state of this engine
    @Getter
    private AxEngineState state = AxEngineState.STOPPED;
    private final Object stateLockObj = new Object();

    // call back listeners
    private final Map<String, EnEventListener> eventListeners = new LinkedHashMap<>();

    // The context of this engine
    @Getter
    private ApexInternalContext internalContext = null;

    // The state machines
    private StateMachineHandler stateMachineHandler = null;

    // Statistics on engine execution
    private final AxEngineStats engineStats;

    /**
     * Constructor, instantiate the engine with its state machine table.
     *
     * @param key the key of the engine
     */
    protected ApexEngineImpl(final AxArtifactKey key) {
        argumentNotNull(key, "AxArtifactKey may not be null");

        LOGGER.entry("ApexEngine()->{}, {}", key.getId(), state);

        this.key = key;

        // Set up statistics collection
        engineStats = new AxEngineStats();
        engineStats.setKey(new AxReferenceKey(key, "_EngineStats"));

        LOGGER.exit("ApexEngine()<-" + key.getId() + "," + state);
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public void updateModel(final AxPolicyModel apexModel, final boolean isSubsequentInstance) throws ApexException {
        updateStatePrometheusMetric();
        if (apexModel != null) {
            LOGGER.entry("updateModel()->{}, apexPolicyModel {}", key.getId(), apexModel.getKey().getId());
        } else {
            throw new ApexException(UPDATE_MODEL + key.getId() + ", Apex model is not defined, it has a null value");
        }

        // The engine must be stopped in order to do a model update
        synchronized (stateLockObj) {
            if (!state.equals(AxEngineState.STOPPED)) {
                throw new ApexException(
                    UPDATE_MODEL + key.getId() + ", cannot update model, engine should be stopped but is in state "
                        + state);
            }
        }

        populateIoEventsToTask(apexModel);

        // Create new internal context or update the existing one
        try {
            if (internalContext == null) {
                /// New internal context
                internalContext = new ApexInternalContext(apexModel);
            } else {
                // Existing internal context which must be updated
                internalContext.update(apexModel, isSubsequentInstance);
            }
        } catch (final ContextException e) {
            throw new ApexException(
                UPDATE_MODEL + key.getId() + ", error setting the context for engine \"" + key.getId() + "\"", e);
        }

        // Set up the state machines
        try {
            // We always set up state machines as new because it's only context that must be transferred; policies are
            // always set up as new
            stateMachineHandler = new StateMachineHandler(internalContext);
        } catch (final StateMachineException e) {
            throw new ApexException(
                UPDATE_MODEL + key.getId() + ", error setting up the engine state machines \"" + key.getId() + "\"", e);
        }

        LOGGER.exit(UPDATE_MODEL + key.getId());
    }


    private void populateIoEventsToTask(AxPolicyModel apexPolicyModel) {
        Set<AxArtifactKey> updatedTasks = new TreeSet<>();
        for (var axPolicy : apexPolicyModel.getPolicies().getPolicyMap().values()) {
            for (var axState : axPolicy.getStateMap().values()) {
                AxEvent triggerEvent = apexPolicyModel.getEvents().get(axState.getTrigger());
                axState.getTaskReferences().forEach((taskKey, taskRef) -> {
                    AxTask task = apexPolicyModel.getTasks().getTaskMap().get(taskKey);
                    task.setInputEvent(triggerEvent);
                    updateTaskBasedOnStateOutput(apexPolicyModel, updatedTasks, axState, taskKey, taskRef, task);
                    updatedTasks.add(taskKey);
                });
            }
        }
    }

    private void updateTaskBasedOnStateOutput(AxPolicyModel apexPolicyModel, Set<AxArtifactKey> updatedTasks,
        AxState state, AxArtifactKey taskKey, AxStateTaskReference taskRef, AxTask task) {
        Map<String, AxEvent> outputEvents = new TreeMap<>();
        AxStateOutput stateOutput = null;
        if (AxStateTaskOutputType.LOGIC.equals(taskRef.getStateTaskOutputType())) {
            // in case of SFL, outgoing event will be same for all state outputs that are part of SFL.So, take any entry
            stateOutput = state.getStateOutputs().values().iterator().next();
        } else {
            stateOutput = state.getStateOutputs().get(taskRef.getOutput().getLocalName());
        }
        if (null != stateOutput) {
            if (null == stateOutput.getOutgoingEventSet() || stateOutput.getOutgoingEventSet().isEmpty()) {
                Set<AxArtifactKey> outEventSet = new TreeSet<>();
                outEventSet.add(stateOutput.getOutgoingEvent());
                stateOutput.setOutgoingEventSet(outEventSet);
            }
            if (state.getNextStateSet().isEmpty()
                || state.getNextStateSet().contains(AxReferenceKey.getNullKey().getLocalName())) {
                stateOutput.getOutgoingEventSet().forEach(outgoingEventKey -> outputEvents
                    .put(outgoingEventKey.getName(), apexPolicyModel.getEvents().get(outgoingEventKey)));
            } else {
                AxArtifactKey outgoingEventKey = stateOutput.getOutgoingEvent();
                outputEvents.put(outgoingEventKey.getName(), apexPolicyModel.getEvents().get(outgoingEventKey));
            }
            if (updatedTasks.contains(taskKey)) {
                // this happens only when same task is used by multiple policies
                // with different eventName but same fields
                task.getOutputEvents().putAll(outputEvents);
            } else {
                task.setOutputEvents(outputEvents);
            }
        }
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public void start() throws ApexException {
        LOGGER.entry("start() {}", key);
        synchronized (stateLockObj) {
            if (state != AxEngineState.STOPPED) {
                String message =
                    START + key.getId() + "," + state + ", cannot start engine, engine not in state STOPPED";
                throw new ApexException(message);
            }
        }

        if (stateMachineHandler == null || internalContext == null) {
            throw new ApexException(START + key.getId() + "," + state
                + ",  cannot start engine, engine has not been initialized, its model is not loaded");
        }

        // Set up the state machines
        try {
            // Start the state machines
            stateMachineHandler.start();
            engineStats.engineStart();
        } catch (final StateMachineException e) {
            String message =
                UPDATE_MODEL + key.getId() + ", error starting the engine state machines \"" + key.getId() + "\"";
            throw new ApexException(message, e);
        }

        // OK, we are good to go
        state = AxEngineState.READY;
        updateStatePrometheusMetric();

        LOGGER.exit("start()" + key);
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public void stop() throws ApexException {
        LOGGER.entry("stop()-> {}", key);

        // Check if the engine is already stopped
        synchronized (stateLockObj) {
            if (state == AxEngineState.STOPPED) {
                throw new ApexException(
                    STOP + key.getId() + "," + state + ", cannot stop engine, engine is already stopped");
            }
        }
        // Stop the engine if it is in state READY, if it is in state EXECUTING, wait for execution to finish
        for (int increment = ApexEngineConstants.STOP_EXECUTION_WAIT_TIMEOUT; increment > 0;
            increment -= ApexEngineConstants.APEX_ENGINE_STOP_EXECUTION_WAIT_INCREMENT) {
            ThreadUtilities.sleep(ApexEngineConstants.APEX_ENGINE_STOP_EXECUTION_WAIT_INCREMENT);

            synchronized (stateLockObj) {
                switch (state) {
                    // Engine is OK to stop or has been stopped on return of an event
                    case READY:
                    case STOPPED:
                        state = AxEngineState.STOPPED;
                        updateStatePrometheusMetric();
                        stateMachineHandler.stop();
                        engineStats.engineStop();
                        LOGGER.exit("stop()" + key);
                        return;

                    // Engine is executing a policy, wait for it to stop
                    case EXECUTING:
                        state = AxEngineState.STOPPING;
                        updateStatePrometheusMetric();
                        break;

                    // Wait for the engine to stop
                    case STOPPING:
                        break;

                    default:
                        throw new ApexException(
                            STOP + key.getId() + "," + state + ", cannot stop engine, engine is in an undefined state");
                }
            }
        }

        // Force the engine to STOPPED state
        synchronized (stateLockObj) {
            state = AxEngineState.STOPPED;
        }
        updateStatePrometheusMetric();

        throw new ApexException(STOP + key.getId() + "," + state + ", error stopping engine, engine stop timed out");
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public void clear() throws ApexException {
        LOGGER.entry("clear()-> {}", key);
        synchronized (stateLockObj) {
            if (state != AxEngineState.STOPPED) {
                throw new ApexException(
                    "clear" + "()<-" + key.getId() + "," + state + ", cannot clear engine, engine is not stopped");
            }
        }

        // Clear everything
        stateMachineHandler = null;
        engineStats.clean();

        if (internalContext != null) {
            internalContext.clear();
            internalContext = null;
        }
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public EnEvent createEvent(final AxArtifactKey eventKey) {
        synchronized (stateLockObj) {
            if (state != AxEngineState.READY && state != AxEngineState.EXECUTING) {
                LOGGER.warn("createEvent()<-{},{}, cannot create event, engine not in state READY", key.getId(), state);
                return null;
            }
        }

        try {
            // Create an event using the internal context
            return new EnEvent(eventKey);
        } catch (final Exception e) {
            LOGGER.warn("createEvent()<-{},{}, error on event creation: ", key.getId(), state, e);
            return null;
        }
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public boolean handleEvent(final EnEvent incomingEvent) {
        var ret = false;
        if (incomingEvent == null) {
            LOGGER.warn("handleEvent()<-{},{}, cannot run engine, incoming event is null", key.getId(), state);
            return ret;
        }

        synchronized (stateLockObj) {
            if (state != AxEngineState.READY) {
                LOGGER.warn("handleEvent()<-{},{}, cannot run engine, engine not in state READY", key.getId(), state);
                return ret;
            }

            state = AxEngineState.EXECUTING;
        }
        updateStatePrometheusMetric();

        String message = "execute(): triggered by event " + incomingEvent.toString();
        LOGGER.debug(message);

        // By default we return a null event on errors
        Collection<EnEvent> outgoingEvents = null;
        try {
            engineStats.executionEnter(incomingEvent.getKey());
            outgoingEvents = stateMachineHandler.execute(incomingEvent);
            engineStats.executionExit();
            ret = true;
        } catch (final StateMachineException e) {
            LOGGER.warn("handleEvent()<-{},{}, engine execution error: ", key.getId(), state, e);

            // Create an exception return event
            outgoingEvents = createExceptionEvent(incomingEvent, e);
        }

        // Publish the outgoing event
        try {
            synchronized (eventListeners) {
                if (eventListeners.isEmpty()) {
                    LOGGER.debug("handleEvent()<-{},{}, There is no listener registered to recieve outgoing event: {}",
                        key.getId(), state, outgoingEvents);
                }
                for (final EnEventListener axEventListener : eventListeners.values()) {
                    for (var outgoingEvent : outgoingEvents) {
                        axEventListener.onEnEvent(outgoingEvent);
                    }
                }
            }
        } catch (final ApexException e) {
            LOGGER.warn("handleEvent()<-{},{}, outgoing event publishing error: ", key.getId(), state, e);
            ret = false;
        }
        synchronized (stateLockObj) {
            // Only go to READY if we are still in state EXECUTING, we go to state STOPPED if we were STOPPING
            if (state == AxEngineState.EXECUTING) {
                state = AxEngineState.READY;
            } else if (state == AxEngineState.STOPPING) {
                state = AxEngineState.STOPPED;
            }
        }
        updateStatePrometheusMetric();
        return ret;
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public void addEventListener(final String listenerName, final EnEventListener listener) {
        if (listenerName == null) {
            String message = "addEventListener()<-" + key.getId() + "," + state + ", listenerName is null";
            throw new ApexRuntimeException(message);
        }

        if (listener == null) {
            String message = "addEventListener()<-" + key.getId() + "," + state + ", listener is null";
            throw new ApexRuntimeException(message);
        }

        eventListeners.put(listenerName, listener);
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public void removeEventListener(final String listenerName) {
        if (listenerName == null) {
            String message = "removeEventListener()<-" + key.getId() + "," + state + ", listenerName is null";
            throw new ApexRuntimeException(message);
        }

        eventListeners.remove(listenerName);
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public AxEngineModel getEngineStatus() {
        final var engineModel = new AxEngineModel(key);
        engineModel.setTimestamp(System.currentTimeMillis());
        engineModel.setState(state);
        engineModel.setStats(engineStats);
        return engineModel;
    }

    /**
     * {@inheritDoc}.
     */
    @Override
    public Map<AxArtifactKey, Map<String, Object>> getEngineContext() {
        final Map<AxArtifactKey, Map<String, Object>> currentContext = new LinkedHashMap<>();

        if (internalContext == null) {
            return currentContext;
        }

        for (final Entry<AxArtifactKey, ContextAlbum> contextAlbumEntry : internalContext.getContextAlbums()
            .entrySet()) {
            currentContext.put(contextAlbumEntry.getKey(), contextAlbumEntry.getValue());
        }

        return currentContext;
    }

    /**
     * Create an exception event from the incoming event including the exception information on the event.
     *
     * @param incomingEvent  The incoming event that caused the exception
     * @param eventException The exception that was thrown
     * @return the exception event
     */
    private Set<EnEvent> createExceptionEvent(final EnEvent incomingEvent, final Exception eventException) {
        // The exception event is a clone of the incoming event with the exception suffix added to
        // its name and an extra
        // field "ExceptionMessage" added
        final EnEvent exceptionEvent = (EnEvent) incomingEvent.clone();

        // Create the cascaded message string
        final var exceptionMessageStringBuilder = new StringBuilder();
        exceptionMessageStringBuilder.append(eventException.getMessage());

        Throwable subException = eventException.getCause();
        while (subException != null) {
            exceptionMessageStringBuilder.append("\ncaused by: ");
            exceptionMessageStringBuilder.append(subException.getMessage());
            subException = subException.getCause();
        }

        // Set the exception message on the event
        exceptionEvent.setExceptionMessage(exceptionMessageStringBuilder.toString());

        return Set.of(exceptionEvent);
    }

    /**
     * Update the APEX engine state to prometheus for monitoring.
     */
    private void updateStatePrometheusMetric() {
        ENGINE_STATE.labels(getKey().getId()).set(state.getStateIdentifier());
    }
}