aboutsummaryrefslogtreecommitdiffstats
path: root/controlloop/common/eventmanager/src/main/java/org/onap/policy/controlloop/eventmanager/ControlLoopOperationManager2.java
blob: 6bdaa1575fb8a29cfa09e2b504a238b8617cdd61 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
/*-
 * ============LICENSE_START=======================================================
 * ONAP
 * ================================================================================
 * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
 * Modifications Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved.
 * Modifications Copyright (C) 2019 Tech Mahindra
 * Modifications Copyright (C) 2019 Bell Canada.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */

package org.onap.policy.controlloop.eventmanager;

import java.io.Serializable;
import java.time.Instant;
import java.util.Deque;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.ToString;
import org.onap.policy.aai.AaiConstants;
import org.onap.policy.aai.AaiCqResponse;
import org.onap.policy.controlloop.ControlLoopOperation;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineUtil;
import org.onap.policy.controlloop.policy.Policy;
import org.onap.policy.controlloop.policy.PolicyResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Manages a single Operation for a single event. Once this has been created,
 * {@link #start()} should be invoked, and then {@link #nextStep()} should be invoked
 * continually until it returns {@code false}, indicating that all steps have completed.
 */
@ToString(onlyExplicitlyIncluded = true)
public class ControlLoopOperationManager2 implements Serializable {
    private static final long serialVersionUID = -3773199283624595410L;
    private static final Logger logger = LoggerFactory.getLogger(ControlLoopOperationManager2.class);
    private static final String CL_TIMEOUT_ACTOR = "-CL-TIMEOUT-";
    public static final String LOCK_ACTOR = "LOCK";
    public static final String LOCK_OPERATION = "Lock";
    private static final String GUARD_ACTOR = "GUARD";
    public static final String VSERVER_VSERVER_NAME = "vserver.vserver-name";
    public static final String GENERIC_VNF_VNF_NAME = "generic-vnf.vnf-name";
    public static final String GENERIC_VNF_VNF_ID = "generic-vnf.vnf-id";
    public static final String PNF_NAME = "pnf.pnf-name";

    // @formatter:off
    public enum State {
        ACTIVE,
        LOCK_DENIED,
        LOCK_LOST,
        GUARD_STARTED,
        GUARD_PERMITTED,
        GUARD_DENIED,
        OPERATION_SUCCESS,
        OPERATION_FAILURE,
        CONTROL_LOOP_TIMEOUT
    }
    // @formatter:on

    private final transient ManagerContext operContext;
    private final transient ControlLoopEventContext eventContext;
    private final Policy policy;

    @Getter
    @ToString.Include
    private State state = State.ACTIVE;

    @ToString.Include
    private final String requestId;

    @ToString.Include
    private final String policyId;

    /**
     * Bumped each time the "complete" callback is invoked by the Actor, provided it's for
     * this operation.
     */
    @ToString.Include
    private int attempts = 0;

    private final Deque<Operation> operationHistory = new ConcurrentLinkedDeque<>();

    /**
     * Queue of outcomes yet to be processed. Outcomes are added to this each time the
     * "start" or "complete" callback is invoked.
     */
    @Getter(AccessLevel.PROTECTED)
    private final transient Deque<OperationOutcome> outcomes = new ConcurrentLinkedDeque<>();

    /**
     * Used to cancel the running operation.
     */
    @Getter(AccessLevel.PROTECTED)
    private transient CompletableFuture<OperationOutcome> future = null;

    /**
     * Target entity. Determined after the lock is granted, though it may require the
     * custom query to be performed first.
     */
    @Getter
    private String targetEntity;

    @Getter(AccessLevel.PROTECTED)
    private final transient ControlLoopOperationParams params;
    private final transient PipelineUtil taskUtil;

    /**
     * Time when the lock was first requested.
     */
    private transient AtomicReference<Instant> lockStart = new AtomicReference<>();

    // values extracted from the policy
    @Getter
    private final String actor;
    @Getter
    private final String operation;


    /**
     * Construct an instance.
     *
     * @param operContext this operation's context
     * @param context event context
     * @param policy operation's policy
     * @param executor executor for the Operation
     */
    public ControlLoopOperationManager2(ManagerContext operContext, ControlLoopEventContext context, Policy policy,
                    Executor executor) {

        this.operContext = operContext;
        this.eventContext = context;
        this.policy = policy;
        this.requestId = context.getEvent().getRequestId().toString();
        this.policyId = "" + policy.getId();
        this.actor = policy.getActor();
        this.operation = policy.getRecipe();

        // @formatter:off
        params = ControlLoopOperationParams.builder()
                        .actorService(operContext.getActorService())
                        .actor(actor)
                        .operation(operation)
                        .context(context)
                        .executor(executor)
                        .target(policy.getTarget())
                        .startCallback(this::onStart)
                        .completeCallback(this::onComplete)
                        .build();
        // @formatter:on

        taskUtil = new PipelineUtil(params);
    }

    //
    // Internal class used for tracking
    //
    @Getter
    @ToString
    private class Operation implements Serializable {
        private static final long serialVersionUID = 1L;

        private int attempt;
        private PolicyResult policyResult;
        private ControlLoopOperation clOperation;

        /**
         * Constructs the object.
         *
         * @param outcome outcome of the operation
         */
        public Operation(OperationOutcome outcome) {
            attempt = ControlLoopOperationManager2.this.attempts;
            policyResult = outcome.getResult();
            clOperation = outcome.toControlLoopOperation();
        }
    }

    /**
     * Start the operation, first acquiring any locks that are needed. This should not
     * throw any exceptions, but will, instead, invoke the callbacks with exceptions.
     *
     * @param remainingMs time remaining, in milliseconds, for the control loop
     */
    @SuppressWarnings("unchecked")
    public synchronized void start(long remainingMs) {
        // this is synchronized while we update "future"

        try {
            // provide a default, in case something fails before requestLock() is called
            lockStart.set(Instant.now());

            // @formatter:off
            future = taskUtil.sequence(
                this::detmTarget,
                this::requestLock,
                this::startOperation);
            // @formatter:on

            // handle any exceptions that may be thrown, set timeout, and handle timeout

            // @formatter:off
            future.exceptionally(this::handleException)
                    .orTimeout(remainingMs, TimeUnit.MILLISECONDS)
                    .exceptionally(this::handleTimeout);
            // @formatter:on

        } catch (RuntimeException e) {
            handleException(e);
        }
    }

    /**
     * Start the operation, after the lock has been acquired.
     *
     * @return
     */
    private CompletableFuture<OperationOutcome> startOperation() {
        // @formatter:off
        ControlLoopOperationParams params2 = params.toBuilder()
                    .payload(new LinkedHashMap<>())
                    .retry(policy.getRetry())
                    .timeoutSec(policy.getTimeout())
                    .targetEntity(targetEntity)
                    .build();
        // @formatter:on

        if (policy.getPayload() != null) {
            params2.getPayload().putAll(policy.getPayload());
        }

        return params2.start();
    }

    /**
     * Handles exceptions that may be generated.
     *
     * @param thrown exception that was generated
     * @return {@code null}
     */
    private OperationOutcome handleException(Throwable thrown) {
        if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
            return null;
        }

        logger.warn("{}.{}: exception starting operation for {}", actor, operation, requestId, thrown);
        OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
        outcome.setStart(lockStart.get());
        outcome.setEnd(Instant.now());
        outcome.setFinalOutcome(true);
        onComplete(outcome);

        // this outcome is not used so just return "null"
        return null;
    }

    /**
     * Handles control loop timeout exception.
     *
     * @param thrown exception that was generated
     * @return {@code null}
     */
    private OperationOutcome handleTimeout(Throwable thrown) {
        logger.warn("{}.{}: control loop timeout for {}", actor, operation, requestId, thrown);

        OperationOutcome outcome = taskUtil.setOutcome(params.makeOutcome(), thrown);
        outcome.setActor(CL_TIMEOUT_ACTOR);
        outcome.setOperation(null);
        outcome.setStart(lockStart.get());
        outcome.setEnd(Instant.now());
        outcome.setFinalOutcome(true);
        onComplete(outcome);

        // cancel the operation, if it's still running
        future.cancel(false);

        // this outcome is not used so just return "null"
        return null;
    }

    /**
     * Cancels the operation.
     */
    public void cancel() {
        synchronized (this) {
            if (future == null) {
                return;
            }
        }

        future.cancel(false);
    }

    /**
     * Requests a lock on the {@link #targetEntity}.
     *
     * @return a future to await the lock
     */
    private CompletableFuture<OperationOutcome> requestLock() {
        /*
         * Failures are handled via the callback, and successes are discarded by
         * sequence(), without passing them to onComplete().
         *
         * Return a COPY of the future so that if we try to cancel it, we'll only cancel
         * the copy, not the original. This is done by tacking thenApply() onto the end.
         */
        lockStart.set(Instant.now());
        return operContext.requestLock(targetEntity, this::lockUnavailable).thenApply(outcome -> outcome);
    }

    /**
     * Indicates that the lock on the target entity is unavailable.
     *
     * @param outcome lock outcome
     */
    private void lockUnavailable(OperationOutcome outcome) {

        // Note: NEVER invoke onStart() for locks; only invoke onComplete()
        onComplete(outcome);

        /*
         * Now that we've added the lock outcome to the queue, ensure the future is
         * canceled, which may, itself, generate an operation outcome.
         */
        cancel();
    }

    /**
     * Handles responses provided via the "start" callback. Note: this is never be invoked
     * for locks; only {@link #onComplete(OperationOutcome)} is invoked for locks.
     *
     * @param outcome outcome provided to the callback
     */
    private void onStart(OperationOutcome outcome) {
        if (GUARD_ACTOR.equals(outcome.getActor())) {
            addOutcome(outcome);
        }
    }

    /**
     * Handles responses provided via the "complete" callback. Note: this is never invoked
     * for "successful" locks.
     *
     * @param outcome outcome provided to the callback
     */
    private void onComplete(OperationOutcome outcome) {

        switch (outcome.getActor()) {
            case LOCK_ACTOR:
            case GUARD_ACTOR:
            case CL_TIMEOUT_ACTOR:
                addOutcome(outcome);
                break;

            default:
                if (outcome.isFor(actor, operation)) {
                    addOutcome(outcome);
                }
                break;
        }
    }

    /**
     * Adds an outcome to {@link #outcomes}.
     *
     * @param outcome outcome to be added
     */
    private synchronized void addOutcome(OperationOutcome outcome) {
        /*
         * This is synchronized to prevent nextStep() from invoking processOutcome() at
         * the same time.
         */

        logger.debug("added outcome={} for {}", outcome, requestId);
        outcomes.add(outcome);

        if (outcomes.peekFirst() == outcomes.peekLast()) {
            // this is the first outcome in the queue - process it
            processOutcome();
        }
    }

    /**
     * Looks for the next step in the queue.
     *
     * @return {@code true} if more responses are expected, {@code false} otherwise
     */
    public synchronized boolean nextStep() {
        switch (state) {
            case LOCK_DENIED:
            case LOCK_LOST:
            case GUARD_DENIED:
            case CONTROL_LOOP_TIMEOUT:
                return false;
            default:
                break;
        }

        OperationOutcome outcome = outcomes.peek();
        if (outcome == null) {
            // empty queue
            return true;
        }

        if (outcome.isFinalOutcome() && outcome.isFor(actor, operation)) {
            return false;
        }

        // first item has been processed, remove it
        outcomes.remove();
        if (!outcomes.isEmpty()) {
            // have a new "first" item - process it
            processOutcome();
        }

        return true;
    }

    /**
     * Processes the first item in {@link #outcomes}. Sets the state, increments
     * {@link #attempts}, if appropriate, and stores the operation history in the DB.
     */
    private synchronized void processOutcome() {
        OperationOutcome outcome = outcomes.peek();
        logger.debug("process outcome={} for {}", outcome, requestId);

        switch (outcome.getActor()) {

            case CL_TIMEOUT_ACTOR:
                state = State.CONTROL_LOOP_TIMEOUT;
                break;

            case LOCK_ACTOR:
                // lock is no longer available
                if (state == State.ACTIVE) {
                    state = State.LOCK_DENIED;
                    storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Lock");
                } else {
                    state = State.LOCK_LOST;
                    storeFailureInDataBase(outcome, PolicyResult.FAILURE, "Operation aborted by Lock");
                }
                break;

            case GUARD_ACTOR:
                if (outcome.getEnd() == null) {
                    state = State.GUARD_STARTED;
                } else if (outcome.getResult() == PolicyResult.SUCCESS) {
                    state = State.GUARD_PERMITTED;
                } else {
                    state = State.GUARD_DENIED;
                    storeFailureInDataBase(outcome, PolicyResult.FAILURE_GUARD, "Operation denied by Guard");
                }
                break;

            default:
                // operation completed
                ++attempts;
                state = (outcome.getResult() == PolicyResult.SUCCESS ? State.OPERATION_SUCCESS
                                : State.OPERATION_FAILURE);
                operationHistory.add(new Operation(outcome));
                storeOperationInDataBase();
                break;
        }

        // indicate that this has changed
        operContext.updated(this);
    }

    /**
     * Get the operation, as a message.
     *
     * @return the operation, as a message
     */
    public String getOperationMessage() {
        Operation last = operationHistory.peekLast();
        return (last == null ? null : last.getClOperation().toMessage());
    }

    /**
     * Gets the operation result.
     *
     * @return the operation result
     */
    public PolicyResult getOperationResult() {
        Operation last = operationHistory.peekLast();
        return (last == null ? PolicyResult.FAILURE_EXCEPTION : last.getPolicyResult());
    }

    /**
     * Get the latest operation history.
     *
     * @return the latest operation history
     */
    public String getOperationHistory() {
        Operation last = operationHistory.peekLast();
        return (last == null ? null : last.clOperation.toHistory());
    }

    /**
     * Get the history.
     *
     * @return the list of control loop operations
     */
    public List<ControlLoopOperation> getHistory() {
        return operationHistory.stream().map(Operation::getClOperation).map(ControlLoopOperation::new)
                        .collect(Collectors.toList());
    }

    /**
     * Stores a failure in the DB.
     *
     * @param outcome operation outcome
     * @param result result to put into the DB
     * @param message message to put into the DB
     */
    private void storeFailureInDataBase(OperationOutcome outcome, PolicyResult result, String message) {
        outcome.setActor(actor);
        outcome.setOperation(operation);
        outcome.setMessage(message);
        outcome.setResult(result);

        operationHistory.add(new Operation(outcome));
        storeOperationInDataBase();
    }

    /**
     * Stores the latest operation in the DB.
     */
    private void storeOperationInDataBase() {
        operContext.getDataManager().store(requestId, eventContext.getEvent(),
                        operationHistory.peekLast().getClOperation());
    }

    /**
     * Determines the target entity.
     *
     * @return a future to determine the target entity, or {@code null} if the entity has
     *         already been determined
     */
    protected CompletableFuture<OperationOutcome> detmTarget() {
        if (policy.getTarget() == null) {
            throw new IllegalArgumentException("The target is null");
        }

        if (policy.getTarget().getType() == null) {
            throw new IllegalArgumentException("The target type is null");
        }

        switch (policy.getTarget().getType()) {
            case PNF:
                return detmPnfTarget();
            case VM:
            case VNF:
            case VFMODULE:
                return detmVfModuleTarget();
            default:
                throw new IllegalArgumentException("The target type is not supported");
        }
    }

    /**
     * Determines the PNF target entity.
     *
     * @return a future to determine the target entity, or {@code null} if the entity has
     *         already been determined
     */
    private CompletableFuture<OperationOutcome> detmPnfTarget() {
        if (!PNF_NAME.equalsIgnoreCase(eventContext.getEvent().getTarget())) {
            throw new IllegalArgumentException("Target does not match target type");
        }

        targetEntity = eventContext.getEnrichment().get(PNF_NAME);
        if (targetEntity == null) {
            throw new IllegalArgumentException("AAI section is missing " + PNF_NAME);
        }

        return null;
    }

    /**
     * Determines the VF Module target entity.
     *
     * @return a future to determine the target entity, or {@code null} if the entity has
     *         already been determined
     */
    private CompletableFuture<OperationOutcome> detmVfModuleTarget() {
        String targetFieldName = eventContext.getEvent().getTarget();
        if (targetFieldName == null) {
            throw new IllegalArgumentException("Target is null");
        }

        switch (targetFieldName.toLowerCase()) {
            case VSERVER_VSERVER_NAME:
                targetEntity = eventContext.getEnrichment().get(VSERVER_VSERVER_NAME);
                break;
            case GENERIC_VNF_VNF_ID:
                targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
                break;
            case GENERIC_VNF_VNF_NAME:
                return detmVnfName();
            default:
                throw new IllegalArgumentException("Target does not match target type");
        }

        if (targetEntity == null) {
            throw new IllegalArgumentException("Enrichment data is missing " + targetFieldName);
        }

        return null;
    }

    /**
     * Determines the VNF Name target entity.
     *
     * @return a future to determine the target entity, or {@code null} if the entity has
     *         already been determined
     */
    @SuppressWarnings("unchecked")
    private CompletableFuture<OperationOutcome> detmVnfName() {
        // if the onset is enriched with the vnf-id, we don't need an A&AI response
        targetEntity = eventContext.getEnrichment().get(GENERIC_VNF_VNF_ID);
        if (targetEntity != null) {
            return null;
        }

        // vnf-id was not in the onset - obtain it via the custom query

        // @formatter:off
        ControlLoopOperationParams cqparams = params.toBuilder()
                        .actor(AaiConstants.ACTOR_NAME)
                        .operation(AaiCqResponse.OPERATION)
                        .targetEntity("")
                        .build();
        // @formatter:on

        // perform custom query and then extract the VNF ID from it
        return taskUtil.sequence(() -> eventContext.obtain(AaiCqResponse.CONTEXT_KEY, cqparams),
                        this::extractVnfFromCq);
    }

    /**
     * Extracts the VNF Name target entity from the custom query data.
     *
     * @return {@code null}
     */
    private CompletableFuture<OperationOutcome> extractVnfFromCq() {
        // already have the CQ data
        AaiCqResponse cq = eventContext.getProperty(AaiCqResponse.CONTEXT_KEY);
        if (cq.getDefaultGenericVnf() == null) {
            throw new IllegalArgumentException("No vnf-id found");
        }

        targetEntity = cq.getDefaultGenericVnf().getVnfId();
        if (targetEntity == null) {
            throw new IllegalArgumentException("No vnf-id found");
        }

        return null;
    }
}