aboutsummaryrefslogtreecommitdiffstats
path: root/main/src/test/java/org/onap/policy/pap/main/rest/e2e/End2EndContext.java
blob: c6a674830b52ea8a2876b1468e060e96806532da (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
/*
 * ============LICENSE_START=======================================================
 * ONAP PAP
 * ================================================================================
 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
 * Modifications Copyright (C) 2022-2024 Nordix Foundation.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */

package org.onap.policy.pap.main.rest.e2e;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import lombok.Getter;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource;
import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
import org.onap.policy.common.endpoints.listeners.ScoListener;
import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.onap.policy.common.utils.services.Registry;
import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpResponseDetails;
import org.onap.policy.models.pdp.concepts.PdpStateChange;
import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.models.pdp.concepts.PdpUpdate;
import org.onap.policy.models.pdp.enums.PdpMessageType;
import org.onap.policy.pap.main.PapConstants;
import org.onap.policy.pap.main.comm.PdpModifyRequestMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Context for end-to-end tests.
 */
public class End2EndContext {
    private static final Logger logger = LoggerFactory.getLogger(End2EndContext.class);

    /**
     * Message placed onto a queue to indicate that a PDP has nothing more to do.
     */
    private static final String DONE = "";

    /**
     * Time, in milliseconds, to wait for everything to complete.
     */
    private static final long WAIT_MS = 10000;

    /**
     * Messages to be sent to PAP. Messages are removed from the queue by the ToPapThread
     * and directly handed off to the NOOP source.
     */
    private final BlockingQueue<String> toPap = new LinkedBlockingQueue<>();

    /**
     * Messages to be sent to the PDPs. Messages are removed from the queue by the
     * ToPdpThread and are given to each PDP to handle.
     */
    private final BlockingQueue<String> toPdps = new LinkedBlockingQueue<>();

    /**
     * List of simulated PDPs.
     */
    @Getter
    private final List<PseudoPdp> pdps = new ArrayList<>();

    /**
     * PAP's topic source.
     */
    private final NoopTopicSource toPapTopic;

    /**
     * Decodes messages read from the {@link #toPdps} queue and dispatches them to the
     * appropriate handler.
     */
    private final MessageTypeDispatcher dispatcher;

    /**
     * Thread that passes messages to PAP.
     */
    private final ToPapThread toPapThread;

    /**
     * Thread that passes messages to PDPs.
     */
    private final ToPdpsThread toPdpsThread;

    /**
     * {@code True} if started, {@code false} if stopped.
     */
    private boolean running = false;

    /**
     * Exception thrown by a coder. Should be {@code null} if all is OK.
     */
    private volatile CoderException exception = null;

    /**
     * Listener for messages written to the PDP-PAP topic.
     */
    private TopicListener topicListener = (infra, topic, text) -> toPdps.add(text);

    private String topicPolicyPdpPap = "pdp-pap-topic";

    /**
     * Constructs the object.
     */
    public End2EndContext() {
        toPapTopic = TopicEndpointManager.getManager().getNoopTopicSource(topicPolicyPdpPap);

        TopicEndpointManager.getManager().getNoopTopicSink(topicPolicyPdpPap).register(topicListener);

        dispatcher = new MessageTypeDispatcher("messageName");
        dispatcher.register(PdpMessageType.PDP_UPDATE.name(), new UpdateListener());
        dispatcher.register(PdpMessageType.PDP_STATE_CHANGE.name(), new ChangeListener());

        toPapThread = new ToPapThread();
        toPdpsThread = new ToPdpsThread();
    }

    /**
     * Starts the threads that read the "kafka" queues.
     */
    public void startThreads() {
        if (running) {
            throw new IllegalStateException("already running");
        }

        for (Thread thread : new Thread[] {toPapThread, toPdpsThread}) {
            thread.setDaemon(true);
            thread.start();
        }

        running = true;
    }

    /**
     * Waits for the threads to shut down.
     *
     * @throws InterruptedException if interrupted while waiting
     */
    public void await() throws InterruptedException {
        toPapThread.join(WAIT_MS);
        assertFalse(toPapThread.isAlive());

        PdpModifyRequestMap map = Registry.get(PapConstants.REG_PDP_MODIFY_MAP);
        assertTrue(map.isEmpty());

        // no more requests, thus we can tell the other thread to stop
        toPdps.add(DONE);

        toPdpsThread.join(WAIT_MS);
        assertFalse(toPapThread.isAlive());

        // nothing new should have been added to the PAP queue
        assertTrue(toPap.isEmpty());

        assertNull(exception);
    }

    /**
     * Stops the threads and shuts down the PAP Activator, rest services, and topic end
     * points.
     */
    public void stop() {
        if (!running) {
            throw new IllegalStateException("not running");
        }

        running = false;

        // queue up a "done" message for each PDP
        toPdps.clear();
        pdps.forEach(pdp -> toPdps.add(DONE));

        // queue up a "done" message for each PDP
        toPap.clear();
        pdps.forEach(pdp -> toPap.add(DONE));

        TopicEndpointManager.getManager().getNoopTopicSink(topicPolicyPdpPap).unregister(topicListener);
    }

    /**
     * Adds a simulated PDP. This must be called before {@link #startThreads()} is
     * invoked.
     *
     * @param pdpName PDP name
     * @param pdpType PDP type
     * @return a new, simulated PDP
     * @throws IllegalStateException if {@link #startThreads()} has already been invoked
     */
    public PseudoPdp addPdp(String pdpName, String pdpType) {
        if (running) {
            throw new IllegalStateException("not running");
        }

        PseudoPdp pdp = new PseudoPdp(pdpName);
        pdps.add(pdp);

        return pdp;
    }

    /**
     * Thread that reads messages from the {@link End2EndContext#toPdps} queue and
     * dispatches them to each PDP. This thread terminates as soon as it sees a
     * {@link End2EndContext#DONE} message.
     */
    private class ToPdpsThread extends Thread {
        @Override
        public void run() {
            for (;;) {
                String text;
                try {
                    text = toPdps.take();
                } catch (InterruptedException e) {
                    logger.warn("{} interrupted", ToPdpsThread.class.getName(), e);
                    Thread.currentThread().interrupt();
                    break;
                }

                if (DONE.equals(text)) {
                    break;
                }

                dispatcher.onTopicEvent(CommInfrastructure.NOOP, topicPolicyPdpPap, text);
            }
        }
    }

    /**
     * Thread that reads messages from the {@link End2EndContext#toPap} queue and passes
     * them to the PAP's topic source. This thread terminates once it sees a
     * {@link End2EndContext#DONE} message <i>for each PDP</i>.
     */
    private class ToPapThread extends Thread {

        @Override
        public void run() {
            // pretend we received DONE from PDPs that are already finished
            long ndone = pdps.stream().filter(pdp -> pdp.finished).count();

            while (ndone < pdps.size()) {
                String text;
                try {
                    text = toPap.take();
                } catch (InterruptedException e) {
                    logger.warn("{} interrupted", ToPapThread.class.getName(), e);
                    Thread.currentThread().interrupt();
                    break;
                }

                if (DONE.equals(text)) {
                    ++ndone;

                } else {
                    toPapTopic.offer(text);
                }
            }
        }
    }

    /**
     * Listener for PdpUpdate messages received from PAP. Invokes
     * {@link PseudoPdp#handle(PdpUpdate)} for each PDP.
     */
    private class UpdateListener extends ScoListener<PdpUpdate> {
        public UpdateListener() {
            super(PdpUpdate.class);
        }

        @Override
        public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, PdpUpdate update) {
            pdps.forEach(pdp -> pdp.handle(update));
        }
    }

    /**
     * Listener for PdpStateChange messages received from PAP. Invokes
     * {@link PseudoPdp#handle(PdpStateChange)} for each PDP.
     */
    private class ChangeListener extends ScoListener<PdpStateChange> {
        public ChangeListener() {
            super(PdpStateChange.class);
        }

        @Override
        public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco,
                        PdpStateChange change) {
            pdps.forEach(pdp -> pdp.handle(change));
        }
    }

    /**
     * Simulated PDP. Each PDP handles messages from the PAP and can return replies in
     * response to those messages. The replies must be queued up before
     * {@link End2EndContext#startThreads()} is invoked.
     */
    public class PseudoPdp {
        private final String name;

        private final Coder coder = new StandardCoder();
        private final Queue<PdpStatus> replies = new LinkedList<>();

        /**
         * Messages that this PDP has handled.
         */
        @Getter
        private final Queue<PdpMessage> handled = new ConcurrentLinkedQueue<>();

        private volatile String group = null;
        private volatile String subgroup = null;

        private volatile boolean finished = true;

        /**
         * Constructs the object.
         *
         * @param name PDP name
         */
        private PseudoPdp(String name) {
            this.name = name;
        }

        public PseudoPdp setGroup(String group) {
            this.group = group;
            return this;
        }

        public PseudoPdp setSubgroup(String subgroup) {
            this.subgroup = subgroup;
            return this;
        }

        /**
         * Adds a reply to the list of replies that will be returned in response to
         * messages from the PAP.
         *
         * @param reply reply to be added to the list
         * @return this PDP
         */
        public PseudoPdp addReply(PdpStatus reply) {
            replies.add(reply);
            finished = false;
            return this;
        }

        /**
         * Handles an UPDATE message, recording the information extracted from the message
         * and queuing up a reply, if any.
         *
         * @param message message that was received from PAP
         */
        private void handle(PdpUpdate message) {
            if (message.appliesTo(name, group, subgroup)) {
                handled.add(message);
                group = message.getPdpGroup();
                subgroup = message.getPdpSubgroup();
                reply(message);
            }
        }

        /**
         * Handles a STAT-CHANGE message. Queues up a reply, if any.
         *
         * @param message message that was received from PAP
         */
        private void handle(PdpStateChange message) {
            if (message.appliesTo(name, group, subgroup)) {
                handled.add(message);
                reply(message);
            }
        }

        /**
         * Queues up the next reply. If there are no more replies, then it queues up a
         * {@link End2EndContext#DONE} message.
         *
         * @param message the message to which a reply should be sent
         */
        private void reply(PdpMessage message) {
            PdpStatus status = replies.poll();
            if (status == null) {
                return;
            }

            PdpResponseDetails response = new PdpResponseDetails();
            response.setResponseTo(message.getRequestId());
            status.setResponse(response);

            toPap.add(toJson(status));

            if (replies.isEmpty()) {
                finished = true;
                toPap.add(DONE);
            }
        }

        /**
         * Converts a message to JSON.
         *
         * @param status message to be converted
         * @return JSON representation of the message
         */
        private String toJson(PdpStatus status) {
            try {
                return coder.encode(status);

            } catch (CoderException e) {
                exception = e;
                return DONE;
            }
        }

        @Override
        public String toString() {
            return name;
        }
    }
}