/*
* ============LICENSE_START=======================================================
* ONAP PAP
* ================================================================================
* Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
*/
package org.onap.policy.pap.main.comm;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manager of timers. All of the timers for a given manager have the same wait time, which
* makes it possible to use a linked hash map to track the timers. As a result, timers can
* be quickly added and removed. In addition, the expiration time of any new timer is
* always greater than or equal to the timers that are already in the map. Consequently,
* the map's iterator will go in ascending order from the minimum expiration time to
* maximum expiration time.
*
*
This class has not been tested for multiple threads invoking {@link #run()}
* simultaneously.
*/
public class TimerManager implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(TimerManager.class);
/**
* Name of this manager, used for logging purposes.
*/
private final String name;
/**
* Time that each new timer should wait.
*/
private final long waitTimeMs;
/**
* When the map is empty, the timer thread will block waiting for this semaphore. When
* a new timer is added to the map, the semaphore will be released, thus allowing the
* timer thread to progress.
*/
private final Semaphore sem = new Semaphore(0);
/**
* This is decremented to indicate that this manager should be stopped.
*/
private final CountDownLatch stopper = new CountDownLatch(1);
/**
* Used to lock updates to the map.
*/
private final Object lockit = new Object();
/**
* Maps a timer name to a timer.
*/
private final Map name2timer = new LinkedHashMap<>();
/**
* Constructs the object.
*
* @param name name of this manager, used for logging purposes
* @param waitTimeMs time that each new timer should wait
*/
public TimerManager(String name, long waitTimeMs) {
this.name = name;
this.waitTimeMs = waitTimeMs;
}
/**
* Stops the timer thread.
*/
public void stop() {
logger.info("timer manager {} stopping", name);
// Note: Must decrement the latch BEFORE releasing the semaphore
stopper.countDown();
sem.release();
}
/**
* Registers a timer with the given name. When the timer expires, it is automatically
* unregistered and then executed.
*
* @param timerName name of the timer to register
* @param action action to take when the timer expires; the "timerName" is passed as
* the only argument
* @return the timer
*/
public Timer register(String timerName, Consumer action) {
synchronized (lockit) {
Timer timer = new Timer(timerName, action);
// always remove existing entry so that new entry goes at the end of the map
name2timer.remove(timerName);
name2timer.put(timerName, timer);
logger.info("{} timer registered {}", name, timer);
// release the timer thread in case it's waiting
sem.release();
return timer;
}
}
/**
* Continuously processes timers until {@link #stop()} is invoked.
*/
@Override
public void run() {
logger.info("timer manager {} started", name);
while (stopper.getCount() > 0) {
try {
sem.acquire();
sem.drainPermits();
processTimers();
} catch (InterruptedException e) {
logger.warn("timer manager {} stopping due to interrupt", name);
stopper.countDown();
Thread.currentThread().interrupt();
}
}
logger.info("timer manager {} stopped", name);
}
/**
* Process all timers, continuously, as long as a timer remains in the map (and
* {@link #stop()} has not been called).
*
* @throws InterruptedException if the thread is interrupted
*/
private void processTimers() throws InterruptedException {
Timer timer;
while ((timer = getNextTimer()) != null && stopper.getCount() > 0) {
processTimer(timer);
}
}
/**
* Gets the timer that will expire first.
*
* @return the timer that will expire first, or {@code null} if there are no timers
*/
private Timer getNextTimer() {
synchronized (lockit) {
if (name2timer.isEmpty()) {
return null;
}
// use an iterator to get the first timer in the map
return name2timer.values().iterator().next();
}
}
/**
* Process a timer, waiting until it expires, unregistering it, and then executing its
* action.
*
* @param timer timer to process
* @throws InterruptedException if the thread is interrupted
*/
private void processTimer(Timer timer) throws InterruptedException {
timer.await();
if (stopper.getCount() == 0) {
// stop() was called
return;
}
if (!timer.cancel("expired")) {
// timer was cancelled while we were waiting
return;
}
// run the timer
try {
timer.runner.accept(timer.name);
} catch (RuntimeException e) {
logger.warn("{} timer threw an exception {}", TimerManager.this.name, timer, e);
}
}
/**
* Timer info.
*/
public class Timer {
/**
* The timer's name.
*/
private String name;
/**
* Time, in milliseconds, when the timer will expire.
*/
private long expireMs;
/**
* Action to take when the timer expires.
*/
private Consumer runner;
private Timer(String name, Consumer runner2) {
this.name = name;
this.expireMs = waitTimeMs + currentTimeMillis();
this.runner = runner2;
}
private void await() throws InterruptedException {
// wait for it to expire, if necessary
long tleft = expireMs - currentTimeMillis();
if (tleft > 0) {
logger.info("{} timer waiting {}ms {}", TimerManager.this.name, tleft, this);
sleep(tleft);
}
}
/**
* Cancels the timer.
*
* @return {@code true} if the timer was cancelled, {@code false} if the timer was
* not running
*/
public boolean cancel() {
return cancel("cancelled");
}
/**
* Cancels the timer.
*
* @param cancelMsg message to log if the timer is successfully
* cancelled
* @return {@code true} if the timer was cancelled, {@code false} if the timer was
* not running
*/
private boolean cancel(String cancelMsg) {
synchronized (lockit) {
AtomicBoolean wasPresent = new AtomicBoolean(false);
name2timer.computeIfPresent(name, (key, val) -> {
if (val == this) {
wasPresent.set(true);
return null;
} else {
// different timer is in the map - leave it
return val;
}
});
if (!wasPresent.get()) {
// have a new timer in the map - ignore "this" timer
logger.info("{} timer replaced {}", TimerManager.this.name, this);
return false;
}
logger.debug("{} timer " + cancelMsg + " {}", TimerManager.this.name, this);
return true;
}
}
@Override
public String toString() {
return "Timer [name=" + name + ", expireMs=" + expireMs + "]";
}
}
// these may be overridden by junit tests
/**
* Gets the current time, in milli-seconds.
*
* @return the current time, in milli-seconds
*/
protected long currentTimeMillis() {
return System.currentTimeMillis();
}
/**
* "Sleeps" for a bit, stopping if {@link #stop()} is invoked.
*
* @param timeMs time, in milli-seconds, to sleep
* @throws InterruptedException if this thread is interrupted while sleeping
*/
protected void sleep(long timeMs) throws InterruptedException {
stopper.await(timeMs, TimeUnit.MILLISECONDS);
}
}