diff options
Diffstat (limited to 'utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java')
-rw-r--r-- | utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java | 364 |
1 files changed, 253 insertions, 111 deletions
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java index b37e49e0..f52105ed 100644 --- a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java @@ -1,6 +1,6 @@ -/* +/*- * ============LICENSE_START======================================================= - * Common Utils-Test + * ONAP * ================================================================================ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ @@ -20,184 +20,326 @@ package org.onap.policy.common.utils.time; -import java.util.Date; +import static org.junit.Assert.fail; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.PriorityQueue; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import org.onap.policy.common.utils.time.TestTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * "Current" time, when running junit tests in multiple threads. This is intended to be * injected into classes under test, to replace their {@link CurrentTime} objects. The - * {@link #sleep(long)} method blocks until all threads enter and then it moves the notion - * of "current" time forward, allowing threads to resume, as the end of their sleep time - * is reached. Additional threads do not resume until all threads have once again entered - * {@link #sleep(long)} or when {@link #threadCompleted()} is invoked to indicate that a - * thread will not re-enter {@link #sleep(long)}. + * {@link #sleep(long)} method blocks until the "time" has reached the specified sleep + * time. A queue of work items is maintained, sorted by the time for which the items are + * scheduled to execute. Tasks are executed by the test/controlling thread when one of the + * waitXxx() methods is invoked. {@link PseudoTimer} and + * {@link PseudoScheduledExecutorService} add work items to the queue. + * + * <p/> + * This only handles relatively simple situations, though it does support multi-threaded + * testing. */ -public class TestTimeMulti extends CurrentTime { +public class TestTimeMulti extends TestTime { + private static final Logger logger = LoggerFactory.getLogger(TestTimeMulti.class); + + public static final String NEVER_SATISFIED = "condition was never satisfied"; + + /** + * Default value, in milliseconds, to wait for an item to be added to the queue. + */ + public static final long DEFAULT_MAX_WAIT_MS = 5000L; /** - * Number of threads that will be sleeping simultaneously. + * Maximum time that the test thread should wait for something to be added to its work + * queue. */ - private int nthreads; + @Getter + private final long maxWaitMs; /** - * "Current" time, in milliseconds, used by tests. + * Queue of timer tasks to be executed, sorted by {@link WorkItem#nextMs}. */ - private long tcur = System.currentTimeMillis(); + private final PriorityQueue<WorkItem> queue = + new PriorityQueue<>((item1, item2) -> Long.compare(item1.getNextMs(), item2.getNextMs())); /** - * Queue of sleeping threads waiting to be awakened. + * Lock used when modifying the queue. */ - private final PriorityQueue<Info> queue = new PriorityQueue<>(); + private final Object updateLock = new Object(); /** - * Used to synchronize updates. + * Constructs the object using the default maximum wait time. */ - private final Object locker = new Object(); + public TestTimeMulti() { + this(DEFAULT_MAX_WAIT_MS); + } /** - * Constructor. + * Constructs the object. * - * @param nthreads number of threads that will be sleeping simultaneously + * @param maxWaitMs maximum time that the test thread should wait for something to be + * added to its work queue */ - public TestTimeMulti(int nthreads) { - this.nthreads = nthreads; + public TestTimeMulti(long maxWaitMs) { + this.maxWaitMs = maxWaitMs; } - @Override - public long getMillis() { - return tcur; + /** + * Determines if the task queue is empty. + * + * @return {@code true} if the task queue is empty, {@code false} otherwise + */ + public boolean isEmpty() { + synchronized (updateLock) { + purgeItems(); + return queue.isEmpty(); + } } - @Override - public Date getDate() { - return new Date(tcur); + /** + * Gets the number of tasks in the work queue. + * + * @return the number of tasks in the work queue + */ + public int queueLength() { + synchronized (updateLock) { + purgeItems(); + return queue.size(); + } } - @Override - public void sleep(long sleepMs) throws InterruptedException { - if (sleepMs <= 0) { - return; + /** + * Indicates that this will no longer be used. Interrupts any threads that are waiting + * for their "sleep()" to complete. + */ + public void destroy() { + synchronized (updateLock) { + queue.forEach(WorkItem::interrupt); + queue.clear(); } + } - Info info = new Info(tcur + sleepMs); + /** + * Runs a single task from the queue. + * + * @param waitMs time, in milliseconds, for which to wait. This is "real" time rather + * than pseudo time + * + * @return {@code true} if a task was run, {@code false} if the queue was empty + * @throws InterruptedException if the current thread is interrupted + */ + public boolean runOneTask(long waitMs) throws InterruptedException { + WorkItem item = pollQueue(waitMs); + if (item == null) { + return false; + } - synchronized (locker) { - queue.add(info); + runItem(item); + return true; + } - if (queue.size() == nthreads) { - // all threads are now sleeping - wake one up - wakeThreads(); + /** + * Waits for the pseudo time to reach a certain point. Executes work items until the + * time is reached. + * + * @param waitMs pseudo time, in milliseconds, for which to wait + * @throws InterruptedException if the current thread is interrupted + */ + public void waitFor(long waitMs) throws InterruptedException { + // pseudo time for which we're waiting + long tend = getMillis() + waitMs; + + while (getMillis() < tend) { + if (!runOneTask(maxWaitMs)) { + /* + * Waited the maximum poll time and nothing has happened, so we'll just + * bump the time directly. + */ + super.sleep(tend - getMillis()); + break; } } - - // this MUST happen outside of the "synchronized" block - info.await(); } /** - * Indicates that a thread has terminated or that it will no longer be invoking - * {@link #sleep(long)}. Awakens the next sleeping thread, if the queue is full after - * removing the terminated thread. + * Waits for a condition to become true. Executes work items until the given condition + * is true. * - * @throws IllegalStateException if the queue is already full + * @param condition condition to be checked */ - public void threadCompleted() { - synchronized (locker) { - int sz = queue.size(); - if (sz >= nthreads) { - throw new IllegalStateException("too many threads still sleeping"); - } + public void waitUntil(Callable<Boolean> condition) { + try { + // real time for which we're waiting + long realEnd = System.currentTimeMillis() + maxWaitMs; - --nthreads; + while (System.currentTimeMillis() < realEnd) { + if (condition.call()) { + return; + } - if (sz == nthreads) { - // after removing terminated thread - queue is now full; awaken something - wakeThreads(); + runOneTask(100); } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("interrupted while waiting for condition", e); + fail("interrupted while waiting for condition: " + e.getMessage()); + + } catch (Exception e) { + logger.error("condition evaluator threw an exception", e); + fail("condition evaluator threw an exception: " + e.getMessage()); } + + fail(NEVER_SATISFIED); } /** - * Advances the "current" time and awakens any threads sleeping until that time. + * Waits for a condition to become true. Executes work items until the given condition + * is true or the maximum wait time is reached. + * + * @param twait maximum, pseudo time to wait + * @param units time units represented by "twait" + * @param condition condition to be checked */ - private void wakeThreads() { - Info info = queue.poll(); - if (info == null) { - return; - } + public void waitUntil(long twait, TimeUnit units, Callable<Boolean> condition) { + // pseudo time for which we're waiting + long tend = getMillis() + units.toMillis(twait); - tcur = info.getAwakenAtMs(); - info.wake(); + waitUntil(() -> { + if (getMillis() >= tend) { + fail(NEVER_SATISFIED); + } - while ((info = queue.poll()) != null) { - if (tcur == info.getAwakenAtMs()) { - info.wake(); + return condition.call(); + }); + } - } else { - // not ready to wake this thread - put it back in the queue - queue.add(info); - break; + /** + * Gets one item from the work queue. + * + * @param waitMs time, in milliseconds, for which to wait. This is "real" time rather + * than pseudo time + * @return the first item in the queue, or {@code null} if no item was added to the + * queue before the wait time expired + * @throws InterruptedException if the current thread was interrupted + */ + private WorkItem pollQueue(long waitMs) throws InterruptedException { + long realEnd = System.currentTimeMillis() + waitMs; + WorkItem work; + + synchronized (updateLock) { + while ((work = queue.poll()) == null) { + updateLock.wait(Math.max(1, realEnd - System.currentTimeMillis())); + + if (queue.isEmpty() && System.currentTimeMillis() >= realEnd) { + return null; + } } } + + return work; } /** - * Info about a sleeping thread. + * Runs a work item. + * + * @param work work item to be run + * @throws InterruptedException if the current thread was interrupted */ - private static class Info implements Comparable<Info> { - - /** - * Time, in milliseconds, at which the associated thread should awaken. - */ - private final long awakenAtMs; + private void runItem(WorkItem work) throws InterruptedException { + if (work.wasCancelled()) { + logger.info("work item was canceled {}", work); + return; + } - /** - * This is triggered when the associated thread should awaken. - */ - private final CountDownLatch latch = new CountDownLatch(1); + // update the pseudo time + super.sleep(work.getNextMs() - getMillis()); - /** - * Constructor. - * - * @param awakenAtMs time, in milliseconds, at which the associated thread should - * awaken + /* + * Add it back into the queue if appropriate, in case cancel() is called while + * it's executing. */ - public Info(long awakenAtMs) { - this.awakenAtMs = awakenAtMs; + if (work.bumpNextTime()) { + logger.info("re-enqueuing work item"); + enqueue(work); } - public long getAwakenAtMs() { - return awakenAtMs; + logger.info("fire work item {}", work); + work.fire(); + } + + @Override + public void sleep(long sleepMs) throws InterruptedException { + if (sleepMs <= 0) { + return; } - /** - * Awakens the associated thread by decrementing its latch. - */ - public void wake() { - latch.countDown(); + SleepItem item = new SleepItem(this, sleepMs, Thread.currentThread()); + enqueue(item); + + // wait for the item to fire + logger.info("sleeping {}", item); + item.await(); + logger.info("done sleeping {}", Thread.currentThread()); + } + + /** + * Adds an item to the {@link #queue}. + * + * @param item item to be added + */ + protected void enqueue(WorkItem item) { + logger.info("enqueue work item {}", item); + synchronized (updateLock) { + queue.add(item); + updateLock.notify(); } + } - /** - * Blocks the current thread until awakened (i.e., until its latch is - * decremented). - * - * @throws InterruptedException can be interrupted - */ - public void await() throws InterruptedException { - latch.await(); + /** + * Cancels work items by removing them from the queue if they're associated with the + * specified object. + * + * @param associate object whose associated items are to be cancelled + * @return list of items that were canceled + */ + protected List<WorkItem> cancelItems(Object associate) { + List<WorkItem> items = new LinkedList<>(); + + synchronized (updateLock) { + Iterator<WorkItem> iter = queue.iterator(); + while (iter.hasNext()) { + WorkItem item = iter.next(); + if (item.isAssociatedWith(associate)) { + iter.remove(); + items.add(item); + } + } } - @Override - public int compareTo(Info object) { - int diff = Long.compare(awakenAtMs, object.awakenAtMs); + return items; + } - // this assumes that Object.toString() is unique for each Info object - if (diff == 0) { - diff = this.toString().compareTo(object.toString()); + /** + * Purges work items that are known to have been canceled. (Does not remove canceled + * TimerTasks, as there is no way via the public API to determine if the task has been + * canceled.) + */ + public void purgeItems() { + synchronized (updateLock) { + Iterator<WorkItem> iter = queue.iterator(); + while (iter.hasNext()) { + if (iter.next().wasCancelled()) { + iter.remove(); + } } - return diff; } - } } |