aboutsummaryrefslogtreecommitdiffstats
path: root/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java
diff options
context:
space:
mode:
Diffstat (limited to 'utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java')
-rw-r--r--utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java364
1 files changed, 253 insertions, 111 deletions
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java
index b37e49e0..f52105ed 100644
--- a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java
@@ -1,6 +1,6 @@
-/*
+/*-
* ============LICENSE_START=======================================================
- * Common Utils-Test
+ * ONAP
* ================================================================================
* Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
@@ -20,184 +20,326 @@
package org.onap.policy.common.utils.time;
-import java.util.Date;
+import static org.junit.Assert.fail;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import java.util.PriorityQueue;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.onap.policy.common.utils.time.TestTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* "Current" time, when running junit tests in multiple threads. This is intended to be
* injected into classes under test, to replace their {@link CurrentTime} objects. The
- * {@link #sleep(long)} method blocks until all threads enter and then it moves the notion
- * of "current" time forward, allowing threads to resume, as the end of their sleep time
- * is reached. Additional threads do not resume until all threads have once again entered
- * {@link #sleep(long)} or when {@link #threadCompleted()} is invoked to indicate that a
- * thread will not re-enter {@link #sleep(long)}.
+ * {@link #sleep(long)} method blocks until the "time" has reached the specified sleep
+ * time. A queue of work items is maintained, sorted by the time for which the items are
+ * scheduled to execute. Tasks are executed by the test/controlling thread when one of the
+ * waitXxx() methods is invoked. {@link PseudoTimer} and
+ * {@link PseudoScheduledExecutorService} add work items to the queue.
+ *
+ * <p/>
+ * This only handles relatively simple situations, though it does support multi-threaded
+ * testing.
*/
-public class TestTimeMulti extends CurrentTime {
+public class TestTimeMulti extends TestTime {
+ private static final Logger logger = LoggerFactory.getLogger(TestTimeMulti.class);
+
+ public static final String NEVER_SATISFIED = "condition was never satisfied";
+
+ /**
+ * Default value, in milliseconds, to wait for an item to be added to the queue.
+ */
+ public static final long DEFAULT_MAX_WAIT_MS = 5000L;
/**
- * Number of threads that will be sleeping simultaneously.
+ * Maximum time that the test thread should wait for something to be added to its work
+ * queue.
*/
- private int nthreads;
+ @Getter
+ private final long maxWaitMs;
/**
- * "Current" time, in milliseconds, used by tests.
+ * Queue of timer tasks to be executed, sorted by {@link WorkItem#nextMs}.
*/
- private long tcur = System.currentTimeMillis();
+ private final PriorityQueue<WorkItem> queue =
+ new PriorityQueue<>((item1, item2) -> Long.compare(item1.getNextMs(), item2.getNextMs()));
/**
- * Queue of sleeping threads waiting to be awakened.
+ * Lock used when modifying the queue.
*/
- private final PriorityQueue<Info> queue = new PriorityQueue<>();
+ private final Object updateLock = new Object();
/**
- * Used to synchronize updates.
+ * Constructs the object using the default maximum wait time.
*/
- private final Object locker = new Object();
+ public TestTimeMulti() {
+ this(DEFAULT_MAX_WAIT_MS);
+ }
/**
- * Constructor.
+ * Constructs the object.
*
- * @param nthreads number of threads that will be sleeping simultaneously
+ * @param maxWaitMs maximum time that the test thread should wait for something to be
+ * added to its work queue
*/
- public TestTimeMulti(int nthreads) {
- this.nthreads = nthreads;
+ public TestTimeMulti(long maxWaitMs) {
+ this.maxWaitMs = maxWaitMs;
}
- @Override
- public long getMillis() {
- return tcur;
+ /**
+ * Determines if the task queue is empty.
+ *
+ * @return {@code true} if the task queue is empty, {@code false} otherwise
+ */
+ public boolean isEmpty() {
+ synchronized (updateLock) {
+ purgeItems();
+ return queue.isEmpty();
+ }
}
- @Override
- public Date getDate() {
- return new Date(tcur);
+ /**
+ * Gets the number of tasks in the work queue.
+ *
+ * @return the number of tasks in the work queue
+ */
+ public int queueLength() {
+ synchronized (updateLock) {
+ purgeItems();
+ return queue.size();
+ }
}
- @Override
- public void sleep(long sleepMs) throws InterruptedException {
- if (sleepMs <= 0) {
- return;
+ /**
+ * Indicates that this will no longer be used. Interrupts any threads that are waiting
+ * for their "sleep()" to complete.
+ */
+ public void destroy() {
+ synchronized (updateLock) {
+ queue.forEach(WorkItem::interrupt);
+ queue.clear();
}
+ }
- Info info = new Info(tcur + sleepMs);
+ /**
+ * Runs a single task from the queue.
+ *
+ * @param waitMs time, in milliseconds, for which to wait. This is "real" time rather
+ * than pseudo time
+ *
+ * @return {@code true} if a task was run, {@code false} if the queue was empty
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ public boolean runOneTask(long waitMs) throws InterruptedException {
+ WorkItem item = pollQueue(waitMs);
+ if (item == null) {
+ return false;
+ }
- synchronized (locker) {
- queue.add(info);
+ runItem(item);
+ return true;
+ }
- if (queue.size() == nthreads) {
- // all threads are now sleeping - wake one up
- wakeThreads();
+ /**
+ * Waits for the pseudo time to reach a certain point. Executes work items until the
+ * time is reached.
+ *
+ * @param waitMs pseudo time, in milliseconds, for which to wait
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ public void waitFor(long waitMs) throws InterruptedException {
+ // pseudo time for which we're waiting
+ long tend = getMillis() + waitMs;
+
+ while (getMillis() < tend) {
+ if (!runOneTask(maxWaitMs)) {
+ /*
+ * Waited the maximum poll time and nothing has happened, so we'll just
+ * bump the time directly.
+ */
+ super.sleep(tend - getMillis());
+ break;
}
}
-
- // this MUST happen outside of the "synchronized" block
- info.await();
}
/**
- * Indicates that a thread has terminated or that it will no longer be invoking
- * {@link #sleep(long)}. Awakens the next sleeping thread, if the queue is full after
- * removing the terminated thread.
+ * Waits for a condition to become true. Executes work items until the given condition
+ * is true.
*
- * @throws IllegalStateException if the queue is already full
+ * @param condition condition to be checked
*/
- public void threadCompleted() {
- synchronized (locker) {
- int sz = queue.size();
- if (sz >= nthreads) {
- throw new IllegalStateException("too many threads still sleeping");
- }
+ public void waitUntil(Callable<Boolean> condition) {
+ try {
+ // real time for which we're waiting
+ long realEnd = System.currentTimeMillis() + maxWaitMs;
- --nthreads;
+ while (System.currentTimeMillis() < realEnd) {
+ if (condition.call()) {
+ return;
+ }
- if (sz == nthreads) {
- // after removing terminated thread - queue is now full; awaken something
- wakeThreads();
+ runOneTask(100);
}
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("interrupted while waiting for condition", e);
+ fail("interrupted while waiting for condition: " + e.getMessage());
+
+ } catch (Exception e) {
+ logger.error("condition evaluator threw an exception", e);
+ fail("condition evaluator threw an exception: " + e.getMessage());
}
+
+ fail(NEVER_SATISFIED);
}
/**
- * Advances the "current" time and awakens any threads sleeping until that time.
+ * Waits for a condition to become true. Executes work items until the given condition
+ * is true or the maximum wait time is reached.
+ *
+ * @param twait maximum, pseudo time to wait
+ * @param units time units represented by "twait"
+ * @param condition condition to be checked
*/
- private void wakeThreads() {
- Info info = queue.poll();
- if (info == null) {
- return;
- }
+ public void waitUntil(long twait, TimeUnit units, Callable<Boolean> condition) {
+ // pseudo time for which we're waiting
+ long tend = getMillis() + units.toMillis(twait);
- tcur = info.getAwakenAtMs();
- info.wake();
+ waitUntil(() -> {
+ if (getMillis() >= tend) {
+ fail(NEVER_SATISFIED);
+ }
- while ((info = queue.poll()) != null) {
- if (tcur == info.getAwakenAtMs()) {
- info.wake();
+ return condition.call();
+ });
+ }
- } else {
- // not ready to wake this thread - put it back in the queue
- queue.add(info);
- break;
+ /**
+ * Gets one item from the work queue.
+ *
+ * @param waitMs time, in milliseconds, for which to wait. This is "real" time rather
+ * than pseudo time
+ * @return the first item in the queue, or {@code null} if no item was added to the
+ * queue before the wait time expired
+ * @throws InterruptedException if the current thread was interrupted
+ */
+ private WorkItem pollQueue(long waitMs) throws InterruptedException {
+ long realEnd = System.currentTimeMillis() + waitMs;
+ WorkItem work;
+
+ synchronized (updateLock) {
+ while ((work = queue.poll()) == null) {
+ updateLock.wait(Math.max(1, realEnd - System.currentTimeMillis()));
+
+ if (queue.isEmpty() && System.currentTimeMillis() >= realEnd) {
+ return null;
+ }
}
}
+
+ return work;
}
/**
- * Info about a sleeping thread.
+ * Runs a work item.
+ *
+ * @param work work item to be run
+ * @throws InterruptedException if the current thread was interrupted
*/
- private static class Info implements Comparable<Info> {
-
- /**
- * Time, in milliseconds, at which the associated thread should awaken.
- */
- private final long awakenAtMs;
+ private void runItem(WorkItem work) throws InterruptedException {
+ if (work.wasCancelled()) {
+ logger.info("work item was canceled {}", work);
+ return;
+ }
- /**
- * This is triggered when the associated thread should awaken.
- */
- private final CountDownLatch latch = new CountDownLatch(1);
+ // update the pseudo time
+ super.sleep(work.getNextMs() - getMillis());
- /**
- * Constructor.
- *
- * @param awakenAtMs time, in milliseconds, at which the associated thread should
- * awaken
+ /*
+ * Add it back into the queue if appropriate, in case cancel() is called while
+ * it's executing.
*/
- public Info(long awakenAtMs) {
- this.awakenAtMs = awakenAtMs;
+ if (work.bumpNextTime()) {
+ logger.info("re-enqueuing work item");
+ enqueue(work);
}
- public long getAwakenAtMs() {
- return awakenAtMs;
+ logger.info("fire work item {}", work);
+ work.fire();
+ }
+
+ @Override
+ public void sleep(long sleepMs) throws InterruptedException {
+ if (sleepMs <= 0) {
+ return;
}
- /**
- * Awakens the associated thread by decrementing its latch.
- */
- public void wake() {
- latch.countDown();
+ SleepItem item = new SleepItem(this, sleepMs, Thread.currentThread());
+ enqueue(item);
+
+ // wait for the item to fire
+ logger.info("sleeping {}", item);
+ item.await();
+ logger.info("done sleeping {}", Thread.currentThread());
+ }
+
+ /**
+ * Adds an item to the {@link #queue}.
+ *
+ * @param item item to be added
+ */
+ protected void enqueue(WorkItem item) {
+ logger.info("enqueue work item {}", item);
+ synchronized (updateLock) {
+ queue.add(item);
+ updateLock.notify();
}
+ }
- /**
- * Blocks the current thread until awakened (i.e., until its latch is
- * decremented).
- *
- * @throws InterruptedException can be interrupted
- */
- public void await() throws InterruptedException {
- latch.await();
+ /**
+ * Cancels work items by removing them from the queue if they're associated with the
+ * specified object.
+ *
+ * @param associate object whose associated items are to be cancelled
+ * @return list of items that were canceled
+ */
+ protected List<WorkItem> cancelItems(Object associate) {
+ List<WorkItem> items = new LinkedList<>();
+
+ synchronized (updateLock) {
+ Iterator<WorkItem> iter = queue.iterator();
+ while (iter.hasNext()) {
+ WorkItem item = iter.next();
+ if (item.isAssociatedWith(associate)) {
+ iter.remove();
+ items.add(item);
+ }
+ }
}
- @Override
- public int compareTo(Info object) {
- int diff = Long.compare(awakenAtMs, object.awakenAtMs);
+ return items;
+ }
- // this assumes that Object.toString() is unique for each Info object
- if (diff == 0) {
- diff = this.toString().compareTo(object.toString());
+ /**
+ * Purges work items that are known to have been canceled. (Does not remove canceled
+ * TimerTasks, as there is no way via the public API to determine if the task has been
+ * canceled.)
+ */
+ public void purgeItems() {
+ synchronized (updateLock) {
+ Iterator<WorkItem> iter = queue.iterator();
+ while (iter.hasNext()) {
+ if (iter.next().wasCancelled()) {
+ iter.remove();
+ }
}
- return diff;
}
-
}
}