aboutsummaryrefslogtreecommitdiffstats
path: root/main/src/main/java/org/onap/policy/pap/main/comm/TimerManager.java
blob: 99677fb8115c0e2446e5afc940a378fd582c6d88 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
/*
 * ============LICENSE_START=======================================================
 * ONAP PAP
 * ================================================================================
 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */

package org.onap.policy.pap.main.comm;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Manager of timers. All of the timers for a given manager have the same wait time, which
 * makes it possible to use a linked hash map to track the timers. As a result, timers can
 * be quickly added and removed. In addition, the expiration time of any new timer is
 * always greater than or equal to the timers that are already in the map. Consequently,
 * the map's iterator will go in ascending order from the minimum expiration time to
 * maximum expiration time.
 *
 * <p>This class has not been tested for multiple threads invoking {@link #run()}
 * simultaneously.
 */
public class TimerManager implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(TimerManager.class);

    /**
     * Name of this manager, used for logging purposes.
     */
    private final String name;

    /**
     * Time that each new timer should wait.
     */
    private final long waitTimeMs;

    /**
     * When the map is empty, the timer thread will block waiting for this semaphore. When
     * a new timer is added to the map, the semaphore will be released, thus allowing the
     * timer thread to progress.
     */
    private final Semaphore sem = new Semaphore(0);

    /**
     * This is decremented to indicate that this manager should be stopped.
     */
    private final CountDownLatch stopper = new CountDownLatch(1);

    /**
     * Used to lock updates to the map.
     */
    private final Object lockit = new Object();

    /**
     * Maps a timer name to a timer.
     */
    private final Map<String, Timer> name2timer = new LinkedHashMap<>();

    /**
     * Constructs the object.
     *
     * @param name name of this manager, used for logging purposes
     * @param waitTimeMs time that each new timer should wait
     */
    public TimerManager(String name, long waitTimeMs) {
        this.name = name;
        this.waitTimeMs = waitTimeMs;
    }

    /**
     * Stops the timer thread.
     */
    public void stop() {
        logger.info("timer manager {} stopping", name);

        // Note: Must decrement the latch BEFORE releasing the semaphore
        stopper.countDown();
        sem.release();
    }

    /**
     * Registers a timer with the given name. When the timer expires, it is automatically
     * unregistered and then executed.
     *
     * @param timerName name of the timer to register
     * @param action action to take when the timer expires; the "timerName" is passed as
     *        the only argument
     * @return the timer
     */
    public Timer register(String timerName, Consumer<String> action) {

        synchronized (lockit) {
            Timer timer = new Timer(timerName, action);

            // always remove existing entry so that new entry goes at the end of the map
            name2timer.remove(timerName);
            name2timer.put(timerName, timer);

            logger.info("{} timer registered {}", name, timer);

            // release the timer thread in case it's waiting
            sem.release();

            return timer;
        }
    }

    /**
     * Continuously processes timers until {@link #stop()} is invoked.
     */
    @Override
    public void run() {
        logger.info("timer manager {} started", name);

        while (stopper.getCount() > 0) {

            try {
                sem.acquire();
                sem.drainPermits();

                processTimers();

            } catch (InterruptedException e) {
                logger.warn("timer manager {} stopping due to interrupt", name);
                stopper.countDown();
                Thread.currentThread().interrupt();
            }
        }

        logger.info("timer manager {} stopped", name);
    }

    /**
     * Process all timers, continuously, as long as a timer remains in the map (and
     * {@link #stop()} has not been called).
     *
     * @throws InterruptedException if the thread is interrupted
     */
    private void processTimers() throws InterruptedException {
        Timer timer;
        while ((timer = getNextTimer()) != null && stopper.getCount() > 0) {
            processTimer(timer);
        }
    }

    /**
     * Gets the timer that will expire first.
     *
     * @return the timer that will expire first, or {@code null} if there are no timers
     */
    private Timer getNextTimer() {

        synchronized (lockit) {
            if (name2timer.isEmpty()) {
                return null;
            }

            // use an iterator to get the first timer in the map
            return name2timer.values().iterator().next();
        }
    }

    /**
     * Process a timer, waiting until it expires, unregistering it, and then executing its
     * action.
     *
     * @param timer timer to process
     * @throws InterruptedException if the thread is interrupted
     */
    private void processTimer(Timer timer) throws InterruptedException {
        timer.await();

        if (stopper.getCount() == 0) {
            // stop() was called
            return;
        }

        if (!timer.cancel("expired")) {
            // timer was cancelled while we were waiting
            return;
        }


        // run the timer
        try {
            timer.runner.accept(timer.name);
        } catch (RuntimeException e) {
            logger.warn("{} timer threw an exception {}", TimerManager.this.name, timer, e);
        }
    }

    /**
     * Timer info.
     */
    public class Timer {
        /**
         * The timer's name.
         */
        private String name;

        /**
         * Time, in milliseconds, when the timer will expire.
         */
        private long expireMs;

        /**
         * Action to take when the timer expires.
         */
        private Consumer<String> runner;


        private Timer(String name, Consumer<String> runner2) {
            this.name = name;
            this.expireMs = waitTimeMs + currentTimeMillis();
            this.runner = runner2;
        }

        private void await() throws InterruptedException {
            // wait for it to expire, if necessary
            long tleft = expireMs - currentTimeMillis();
            if (tleft > 0) {
                logger.info("{} timer waiting {}ms {}", TimerManager.this.name, tleft, this);
                sleep(tleft);
            }
        }

        /**
         * Cancels the timer.
         *
         * @return {@code true} if the timer was cancelled, {@code false} if the timer was
         *         not running
         */
        public boolean cancel() {
            return cancel("cancelled");
        }

        /**
         * Cancels the timer.
         *
         * @param cancelMsg message to log if the timer is successfully
         *        cancelled
         * @return {@code true} if the timer was cancelled, {@code false} if the timer was
         *         not running
         */
        private boolean cancel(String cancelMsg) {

            synchronized (lockit) {
                AtomicBoolean wasPresent = new AtomicBoolean(false);

                name2timer.computeIfPresent(name, (key, val) -> {

                    if (val == this) {
                        wasPresent.set(true);
                        return null;

                    } else {
                        // different timer is in the map - leave it
                        return val;
                    }
                });

                if (!wasPresent.get()) {
                    // have a new timer in the map - ignore "this" timer
                    logger.info("{} timer replaced {}", TimerManager.this.name, this);
                    return false;
                }

                logger.debug("{} timer {} {}", TimerManager.this.name, cancelMsg, this);

                return true;
            }
        }

        @Override
        public String toString() {
            return "Timer [name=" + name + ", expireMs=" + expireMs + "]";
        }
    }

    // these may be overridden by junit tests

    /**
     * Gets the current time, in milli-seconds.
     *
     * @return the current time, in milli-seconds
     */
    protected long currentTimeMillis() {
        return System.currentTimeMillis();
    }

    /**
     * "Sleeps" for a bit, stopping if {@link #stop()} is invoked.
     *
     * @param timeMs time, in milli-seconds, to sleep
     * @throws InterruptedException if this thread is interrupted while sleeping
     */
    protected void sleep(long timeMs) throws InterruptedException {
        stopper.await(timeMs, TimeUnit.MILLISECONDS);
    }
}