aboutsummaryrefslogtreecommitdiffstats
path: root/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java
blob: 2782eb721710ebecbc8d0ae3b0117b40f45bbab4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
/*
 * ============LICENSE_START=======================================================
 * Common Utils-Test
 * ================================================================================
 * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */

package org.onap.policy.common.utils.time;

import java.util.Date;
import java.util.PriorityQueue;
import java.util.concurrent.CountDownLatch;

/**
 * "Current" time, when running junit tests in multiple threads. This is intended to be
 * injected into classes under test, to replace their {@link CurrentTime} objects. The
 * {@link #sleep(long)} method blocks until all threads enter and then it moves the notion
 * of "current" time forward, allowing threads to resume, as the end of their sleep time
 * is reached. Additional threads do not resume until all threads have once again entered
 * {@link #sleep(long)} or when {@link #threadCompleted()} is invoked to indicate that a
 * thread will not re-enter {@link #sleep(long)}.
 */
public class TestTimeMulti extends CurrentTime {

    /**
     * Number of threads that will be sleeping simultaneously.
     */
    private int nthreads;

    /**
     * "Current" time, in milliseconds, used by tests.
     */
    private long tcur = System.currentTimeMillis();

    /**
     * Queue of sleeping threads waiting to be awakened.
     */
    private final PriorityQueue<Info> queue = new PriorityQueue<>();

    /**
     * Used to synchronize updates.
     */
    private final Object locker = new Object();

    /**
     * Constructor.
     * 
     * @param nthreads number of threads that will be sleeping simultaneously
     */
    public TestTimeMulti(int nthreads) {
        this.nthreads = nthreads;
    }

    @Override
    public long getMillis() {
        return tcur;
    }

    @Override
    public Date getDate() {
        return new Date(tcur);
    }

    @Override
    public void sleep(long sleepMs) throws InterruptedException {
        if (sleepMs <= 0) {
            return;
        }

        Info info = new Info(tcur + sleepMs);

        synchronized (locker) {
            queue.add(info);

            if (queue.size() == nthreads) {
                // all threads are now sleeping - wake one up
                wakeThreads();
            }
        }

        // this MUST happen outside of the "synchronized" block
        info.await();
    }

    /**
     * Indicates that a thread has terminated or that it will no longer be invoking
     * {@link #sleep(long)}. Awakens the next sleeping thread, if the queue is full after
     * removing the terminated thread.
     * 
     * @throws IllegalStateException if the queue is already full
     */
    public void threadCompleted() {
        synchronized (locker) {
            int sz = queue.size();
            if (sz >= nthreads) {
                throw new IllegalStateException("too many threads still sleeping");
            }

            --nthreads;

            if (sz == nthreads) {
                // after removing terminated thread - queue is now full; awaken something
                wakeThreads();
            }
        }
    }

    /**
     * Advances the "current" time and awakens any threads sleeping until that time.
     */
    private void wakeThreads() {
        Info info = queue.poll();
        if (info == null) {
            return;
        }

        tcur = info.getAwakenAtMs();
        info.wake();

        while ((info = queue.poll()) != null) {
            if (tcur == info.getAwakenAtMs()) {
                info.wake();

            } else {
                // not ready to wake this thread - put it back in the queue
                queue.add(info);
                break;
            }
        }
    }

    /**
     * Info about a sleeping thread.
     */
    private static class Info implements Comparable<Info> {

        /**
         * Time, in milliseconds, at which the associated thread should awaken.
         */
        private final long awakenAtMs;

        /**
         * This is triggered when the associated thread should awaken.
         */
        private final CountDownLatch latch = new CountDownLatch(1);

        /**
         * Constructor.
         * 
         * @param awakenAtMs time, in milliseconds, at which the associated thread should
         *        awaken
         */
        public Info(long awakenAtMs) {
            this.awakenAtMs = awakenAtMs;
        }

        public long getAwakenAtMs() {
            return awakenAtMs;
        }

        /**
         * Awakens the associated thread by decrementing its latch.
         */
        public void wake() {
            latch.countDown();
        }

        /**
         * Blocks the current thread until awakened (i.e., until its latch is
         * decremented).
         * 
         * @throws InterruptedException can be interrupted
         */
        public void await() throws InterruptedException {
            latch.await();
        }

        @Override
        public int compareTo(Info object) {
            int diff = Long.compare(awakenAtMs, object.awakenAtMs);

            // this assumes that Object.toString() is unique for each Info object
            if (diff == 0) {
                diff = this.toString().compareTo(object.toString());
            }
            return diff;
        }

    }
}