aboutsummaryrefslogtreecommitdiffstats
path: root/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java')
-rw-r--r--utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java501
1 files changed, 435 insertions, 66 deletions
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java
index f17235a2..8b6501a7 100644
--- a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java
@@ -1,6 +1,6 @@
-/*
+/*-
* ============LICENSE_START=======================================================
- * Common Utils-Test
+ * ONAP
* ================================================================================
* Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
@@ -20,109 +20,478 @@
package org.onap.policy.common.utils.time;
-import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Before;
import org.junit.Test;
-/**
- * Class to test TestTimeMulti.
- */
public class TestTimeMultiTest {
+ private static final long SHORT_WAIT_MS = 100L;
+ private static final long DELAY_MS = 500L;
+ private static final long MAX_WAIT_MS = 5000L;
- private static final int NTHREADS = 10;
- private static final int NTIMES = 100;
- private static final long WAIT_SEC = 5L;
- private static final long MIN_SLEEP_MS = 5L;
+ private TestTimeMulti multi;
- private TestTimeMulti ttm;
- private Semaphore done;
+ @Before
+ public void setUp() {
+ multi = new TestTimeMulti();
+ }
@Test
- public void test() throws Exception {
- ttm = new TestTimeMulti(NTHREADS);
- done = new Semaphore(0);
+ public void testSleep() throws InterruptedException {
+ // negative sleep time
+ final long tbegin = multi.getMillis();
+ MyThread thread = new MyThread(-5);
+ thread.start();
- final long tbeg = ttm.getMillis();
+ // should complete without creating a work item
+ assertTrue(thread.await());
+ assertNull(thread.ex);
- // create threads
- List<MyThread> threads = new ArrayList<>(NTHREADS);
- for (int x = 0; x < NTHREADS; ++x) {
- threads.add(new MyThread(x + MIN_SLEEP_MS));
- }
+ // time should not have changed
+ assertEquals(tbegin, multi.getMillis());
+
+
+ // positive sleep time
+ thread = new MyThread(DELAY_MS);
+ thread.start();
+
+ // must execute the SleepItem
+ multi.runOneTask(MAX_WAIT_MS);
+
+ assertTrue(multi.isEmpty());
+ assertTrue(thread.await());
+ assertNull(thread.ex);
+
+ // time SHOULD HAVE changed
+ assertEquals(tbegin + DELAY_MS, multi.getMillis());
+ }
+
+ @Test
+ public void testTestTimeMulti() {
+ assertTrue(multi.getMaxWaitMs() > 0);
+ }
+
+ @Test
+ public void testTestTimeMultiLong() {
+ assertEquals(200, new TestTimeMulti(200).getMaxWaitMs());
+ }
+
+ @Test
+ public void testIsEmpty_testQueueLength() throws InterruptedException {
+ assertTrue(multi.isEmpty());
+
+ // queue up two items
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+ assertFalse(multi.isEmpty());
+ assertEquals(1, multi.queueLength());
+
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+ assertEquals(2, multi.queueLength());
+
+ // run one - should not be empty yet
+ multi.runOneTask(0);
+ assertFalse(multi.isEmpty());
+ assertEquals(1, multi.queueLength());
+
+ // run the other - should be empty now
+ multi.runOneTask(0);
+ assertTrue(multi.isEmpty());
+ assertEquals(0, multi.queueLength());
+ }
+
+ @Test
+ public void testDestroy() throws InterruptedException {
+ // this won't interrupt
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+
+ // these will interrupt
+ AtomicBoolean interrupted1 = new AtomicBoolean(false);
+ multi.enqueue(new WorkItem(multi, DELAY_MS) {
+ @Override
+ public void interrupt() {
+ interrupted1.set(true);
+ }
+ });
+
+ AtomicBoolean interrupted2 = new AtomicBoolean(false);
+ multi.enqueue(new WorkItem(multi, DELAY_MS) {
+ @Override
+ public void interrupt() {
+ interrupted2.set(true);
+ }
+ });
+
+ multi.destroy();
+ assertTrue(multi.isEmpty());
+
+ assertTrue(interrupted1.get());
+ assertTrue(interrupted2.get());
+ }
+
+ @Test
+ public void testRunOneTask() throws InterruptedException {
+ // nothing in the queue yet
+ assertFalse(multi.runOneTask(0));
+
+ // put something in the queue
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+
+ final long tbegin = multi.getMillis();
+ assertTrue(multi.runOneTask(MAX_WAIT_MS));
+
+ assertEquals(tbegin + DELAY_MS, multi.getMillis());
+
+ // nothing in the queue now
+ assertFalse(multi.runOneTask(0));
+
+ // time doesn't change
+ assertEquals(tbegin + DELAY_MS, multi.getMillis());
+ }
+
+ @Test
+ public void testWaitFor() throws InterruptedException {
+ // queue up a couple of items
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 2));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 3));
+
+ final long realBegin = System.currentTimeMillis();
+ final long tbegin = multi.getMillis();
+ multi.waitFor(DELAY_MS * 2 - 1);
+ assertEquals(tbegin + DELAY_MS * 2, multi.getMillis());
+
+ // minimal real time should have elapsed
+ assertTrue(System.currentTimeMillis() < realBegin + TestTimeMulti.DEFAULT_MAX_WAIT_MS);
+ }
+
+ @Test
+ public void testWaitFor_EmptyQueue() throws InterruptedException {
+ multi = new TestTimeMulti(SHORT_WAIT_MS);
+
+ final long realBegin = System.currentTimeMillis();
+ final long tbegin = multi.getMillis();
+
+ multi.waitFor(2);
+
+ assertEquals(tbegin + 2, multi.getMillis());
+ assertTrue(System.currentTimeMillis() >= realBegin + SHORT_WAIT_MS);
+ }
+
+ @Test
+ public void testWaitUntilCallable() throws InterruptedException {
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 2));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 3));
+
+ final long tbegin = multi.getMillis();
+ AtomicInteger count = new AtomicInteger(0);
+ multi.waitUntil(() -> count.incrementAndGet() == 3);
+
+ assertEquals(tbegin + DELAY_MS * 2, multi.getMillis());
+
+ // should still be one item left in the queue
+ assertEquals(1, multi.queueLength());
+ assertEquals(3, count.get());
+ }
+
+ @Test
+ public void testWaitUntilCallable_InterruptEx() throws InterruptedException {
+ multi = new TestTimeMulti();
+
+ Callable<Boolean> callable = () -> {
+ throw new InterruptedException("expected exception");
+ };
+
+ LinkedBlockingQueue<Error> errors = new LinkedBlockingQueue<>();
+
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ multi.waitUntil(callable);
+ } catch (Error ex) {
+ errors.add(ex);
+ }
+ }
+ };
+
+ thread.start();
+
+ Error ex = errors.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+ assertNotNull(ex);
+ assertEquals("interrupted while waiting for condition: expected exception", ex.getMessage());
+ }
+
+ @Test
+ public void testWaitUntilCallable_ConditionThrowsEx() throws InterruptedException {
+ multi = new TestTimeMulti();
+
+ Callable<Boolean> callable = () -> {
+ throw new IllegalStateException("expected exception");
+ };
+
+ final long realBegin = System.currentTimeMillis();
+ assertThatThrownBy(() -> multi.waitUntil(callable))
+ .hasMessage("condition evaluator threw an exception: expected exception");
+
+ assertTrue(System.currentTimeMillis() < realBegin + TestTimeMulti.DEFAULT_MAX_WAIT_MS);
+ }
+
+ @Test
+ public void testWaitUntilCallable_NeverSatisfied() throws InterruptedException {
+ multi = new TestTimeMulti(SHORT_WAIT_MS);
+
+ final long realBegin = System.currentTimeMillis();
+ assertThatThrownBy(() -> multi.waitUntil(() -> false))
+ .hasMessage(TestTimeMulti.NEVER_SATISFIED);
+ assertTrue(System.currentTimeMillis() >= realBegin + SHORT_WAIT_MS);
+ }
- // launch threads
- for (MyThread thr : threads) {
- thr.start();
+ @Test
+ public void testWaitUntilLongTimeUnitCallable() throws InterruptedException {
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 2));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 3));
+
+ final long tbegin = multi.getMillis();
+ AtomicInteger count = new AtomicInteger(0);
+ multi.waitUntil(DELAY_MS * 4, TimeUnit.MILLISECONDS, () -> count.incrementAndGet() == 3);
+
+ assertEquals(tbegin + DELAY_MS * 2, multi.getMillis());
+
+ // should still be one item left in the queue
+ assertEquals(1, multi.queueLength());
+ assertEquals(3, count.get());
+ }
+
+ @Test
+ public void testWaitUntilLongTimeUnitCallable_PseudoTimeExpires() throws InterruptedException {
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 2));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 3));
+
+ final long tbegin = multi.getMillis();
+ assertThatThrownBy(() -> multi.waitUntil(DELAY_MS * 2 - 1, TimeUnit.MILLISECONDS, () -> false))
+ .hasMessage(TestTimeMulti.NEVER_SATISFIED);
+ assertEquals(tbegin + DELAY_MS * 2, multi.getMillis());
+ }
+
+ @Test
+ public void testRunItem() throws InterruptedException {
+ AtomicBoolean fired = new AtomicBoolean(false);
+ multi.enqueue(new MyWorkItem(fired));
+
+ assertTrue(multi.runOneTask(1));
+
+ // should no longer be in the queue
+ assertTrue(multi.isEmpty());
+
+ // should have been fired
+ assertTrue(fired.get());
+ }
+
+ @Test
+ public void testRunItem_Rescheduled() throws InterruptedException {
+ AtomicBoolean fired = new AtomicBoolean(false);
+
+ multi.enqueue(new MyWorkItem(fired) {
+ @Override
+ public boolean bumpNextTime() {
+ bumpNextTime(DELAY_MS);
+ return true;
+ }
+ });
+
+ assertTrue(multi.runOneTask(1));
+
+ // should still be in the queue
+ assertEquals(1, multi.queueLength());
+
+ // should have been fired
+ assertTrue(fired.get());
+ }
+
+ @Test
+ public void testRunItem_Canceled() throws InterruptedException {
+ AtomicBoolean fired = new AtomicBoolean(false);
+
+ multi.enqueue(new MyWorkItem(fired) {
+ @Override
+ public boolean wasCancelled() {
+ return true;
+ }
+
+ @Override
+ public boolean bumpNextTime() {
+ return true;
+ }
+ });
+
+ final long tbegin = multi.getMillis();
+ assertTrue(multi.runOneTask(1));
+
+ // time should be unchanged
+ assertEquals(tbegin, multi.getMillis());
+
+ assertTrue(multi.isEmpty());
+
+ // should not have been fired
+ assertFalse(fired.get());
+ }
+
+ @Test
+ public void testEnqueue() throws InterruptedException {
+ CountDownLatch started = new CountDownLatch(1);
+ CountDownLatch finished = new CountDownLatch(1);
+ AtomicReference<InterruptedException> ex = new AtomicReference<>();
+
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ started.countDown();
+
+ try {
+ multi.runOneTask(DELAY_MS * 3);
+ } catch (InterruptedException e) {
+ ex.set(e);
+ }
+
+ finished.countDown();
+ }
+ };
+
+ thread.start();
+
+ // wait for thread to start
+ started.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+ // wait for it to block on the lock
+ await().atMost(MAX_WAIT_MS, TimeUnit.MILLISECONDS).until(() -> thread.getState() == Thread.State.TIMED_WAITING);
+
+ // add an item to the queue - should trigger the thread to continue
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+
+ assertTrue(finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS));
+ assertNull(ex.get());
+ }
+
+ @Test
+ public void testCancelItems() throws InterruptedException {
+ AtomicBoolean fired1 = new AtomicBoolean();
+ multi.enqueue(new MyWorkItem(fired1));
+
+ AtomicBoolean fired2 = new AtomicBoolean();
+ multi.enqueue(new MyWorkItem(fired2));
+ multi.enqueue(new MyWorkItem(fired2));
+
+ AtomicBoolean fired3 = new AtomicBoolean();
+ multi.enqueue(new MyWorkItem(fired3));
+
+ // cancel some
+ multi.cancelItems(fired2);
+
+ // should have only canceled two of them
+ assertEquals(2, multi.queueLength());
+
+ // fire both
+ multi.runOneTask(0);
+ multi.runOneTask(0);
+
+ // these should have fired
+ assertTrue(fired1.get());
+ assertTrue(fired3.get());
+
+ // these should NOT have fired
+ assertFalse(fired2.get());
+ }
+
+ @Test
+ public void testPurgeItems() throws InterruptedException {
+ AtomicBoolean fired = new AtomicBoolean();
+
+ // queue up two that are canceled, one that is not
+ multi.enqueue(new MyWorkItem(true));
+ multi.enqueue(new MyWorkItem(fired));
+ multi.enqueue(new MyWorkItem(true));
+
+ multi.purgeItems();
+
+ assertEquals(1, multi.queueLength());
+
+ multi.runOneTask(0);
+ assertTrue(fired.get());
+ }
+
+ private class MyWorkItem extends WorkItem {
+ private final AtomicBoolean fired;
+ private final boolean canceled;
+
+ public MyWorkItem(AtomicBoolean fired) {
+ super(multi, DELAY_MS);
+ this.fired = fired;
+ this.canceled = false;
}
- // wait for each one to complete
- for (MyThread thr : threads) {
- assertTrue("complete " + thr.getSleepMs(), done.tryAcquire(WAIT_SEC, TimeUnit.SECONDS));
- ttm.threadCompleted();
+ public MyWorkItem(boolean canceled) {
+ super(multi, DELAY_MS);
+ this.fired = new AtomicBoolean();
+ this.canceled = canceled;
}
- // check results
- for (MyThread thr : threads) {
- assertEquals("time " + thr.getSleepMs(), thr.texpected, thr.tactual);
+ @Override
+ public void fire() {
+ fired.set(true);
}
- assertTrue(ttm.getMillis() >= tbeg + NTIMES * MIN_SLEEP_MS);
+ @Override
+ public boolean isAssociatedWith(Object associate) {
+ return (fired == associate);
+ }
- // something in the queue, but no threads remain -> exception
- assertThatIllegalStateException().isThrownBy(() -> ttm.threadCompleted());
+ @Override
+ public boolean wasCancelled() {
+ return canceled;
+ }
}
private class MyThread extends Thread {
-
private final long sleepMs;
-
- private volatile long texpected;
- private volatile long tactual;
+ private final CountDownLatch finished = new CountDownLatch(1);
+ private InterruptedException ex = null;
public MyThread(long sleepMs) {
this.sleepMs = sleepMs;
-
this.setDaemon(true);
}
- public long getSleepMs() {
- return sleepMs;
+ public boolean await() throws InterruptedException {
+ return finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
}
@Override
public void run() {
try {
- for (int x = 0; x < NTIMES; ++x) {
- // negative sleep should have no effect
- texpected = ttm.getMillis();
- ttm.sleep(-1);
- if ((tactual = ttm.getMillis()) != texpected) {
- break;
- }
-
- texpected = ttm.getMillis() + sleepMs;
- ttm.sleep(sleepMs);
-
- if ((tactual = ttm.getMillis()) != texpected) {
- break;
- }
-
- if ((tactual = ttm.getDate().getTime()) != texpected) {
- break;
- }
- }
-
- } catch (InterruptedException expected) {
+ multi.sleep(sleepMs);
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ ex = e;
}
- done.release();
+ finished.countDown();
}
}
}