diff options
author | Jim Hahn <jrh3@att.com> | 2019-08-19 17:32:09 -0400 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2019-08-21 13:49:01 -0400 |
commit | cf0cd76d63f9eba680a6307dd4a708e7169cb403 (patch) | |
tree | 43e761425a3f3944ef52e07703ed5411388b07da /utils-test | |
parent | 98a4da643c738a4246cc4cc4aa9c9f21ae47cff8 (diff) |
Enhance TestTimeMulti
Enhance TestTimeMulti to support execution of tasks, whether
submitted via Timers or via Executors.
Change-Id: Ib5f216730b3b69028e9581052645370b827cd446
Issue-ID: POLICY-1968
Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'utils-test')
19 files changed, 2405 insertions, 185 deletions
diff --git a/utils-test/pom.xml b/utils-test/pom.xml index 6ed49255..3744467f 100644 --- a/utils-test/pom.xml +++ b/utils-test/pom.xml @@ -37,6 +37,11 @@ <dependencies> <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>provided</scope> @@ -65,6 +70,12 @@ <artifactId>powermock-api-mockito</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>3.0.0</version> + <scope>test</scope> + </dependency> <dependency> <groupId>com.openpojo</groupId> <artifactId>openpojo</artifactId> diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java new file mode 100644 index 00000000..79d2f226 --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import org.onap.policy.common.utils.time.TestTime; + +/** + * Work item that runs periodically. + */ +class PeriodicItem extends RunnableItem { + + /** + * Time, in milliseconds, to wait between executions. + */ + private final long periodMs; + + + /** + * Constructs the object. + * + * @param currentTime time with which this item is associated + * @param associate object with which this item is associated (e.g., Timer) + * @param delayMs time, in milliseconds, before this item should be executed + * @param periodMs time, in milliseconds, to delay between each execution + * @param action action to be performed + */ + public PeriodicItem(TestTime currentTime, Object associate, long delayMs, long periodMs, Runnable action) { + super(currentTime, associate, delayMs, action); + + if (periodMs <= 0) { + throw new IllegalArgumentException("invalid period " + periodMs); + } + + this.periodMs = periodMs; + } + + @Override + public boolean bumpNextTime() { + bumpNextTime(periodMs); + return true; + } + + @Override + public String toString() { + return "PeriodicItem [nextMs=" + getNextMs() + ", periodMs=" + periodMs + ", associate=" + getAssociate() + "]"; + } +} diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java new file mode 100644 index 00000000..4f9b32c9 --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java @@ -0,0 +1,186 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * Scheduled executor service that uses {@link TestTimeMulti} to execute its tasks. Note: + * the invokeXxx() methods are not currently supported. + */ +public class PseudoScheduledExecutorService implements ScheduledExecutorService { + private static final String NOT_IMPLEMENTED_YET = "not implemented yet"; + + /** + * Object to be used to execute timer tasks. + */ + private final TestTimeMulti currentTime; + + /** + * {@code True} if {@link #shutdown()} or {@link #shutdownNow()} has been called, + * {@code false} otherwise. + */ + private boolean shutdown = false; + + /** + * Constructs the object. + * + * @param currentTime object to be used to execute timer tasks + */ + public PseudoScheduledExecutorService(TestTimeMulti currentTime) { + this.currentTime = currentTime; + } + + /** + * Cancels <i>all</i> tasks that have not yet been executed. + */ + @Override + public void shutdown() { + shutdown = true; + currentTime.cancelItems(this); + } + + /** + * Cancels <i>all</i> tasks that have not yet been executed. Does <i>not</i> interrupt + * any currently executing task. + */ + @Override + public List<Runnable> shutdownNow() { + shutdown = true; + return currentTime.cancelItems(this).stream().map(item -> ((RunnableItem) item).getAction()) + .collect(Collectors.toList()); + } + + @Override + public boolean isShutdown() { + return shutdown; + } + + @Override + public boolean isTerminated() { + return shutdown; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return shutdown; + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return enqueueRunOnce(0, new FutureTask<>(task)); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return enqueueRunOnce(0, new FutureTask<>(task, result)); + } + + @Override + public Future<?> submit(Runnable task) { + return enqueueRunOnce(0, new FutureTask<>(task, null)); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public void execute(Runnable command) { + currentTime.enqueue(new RunnableItem(currentTime, this, 0, command)); + } + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return enqueueRunOnce(unit.toMillis(delay), new PseudoScheduledFuture<>(command, null, false)); + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + return enqueueRunOnce(unit.toMillis(delay), new PseudoScheduledFuture<>(callable, false)); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return enqueuePeriodic(unit.toMillis(initialDelay), unit.toMillis(period), + new PseudoScheduledFuture<>(command, null, true)); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return enqueuePeriodic(unit.toMillis(initialDelay), unit.toMillis(delay), + new PseudoScheduledFuture<>(command, null, true)); + } + + /** + * Enqueues a future to be executed one time. + * + * @param delay delay until the future should be executed + * @param future future to be enqueued + * @return the future + */ + private <F extends FutureTask<T>, T> F enqueueRunOnce(long delay, F future) { + currentTime.enqueue(new RunnableItem(currentTime, this, delay, future)); + return future; + } + + /** + * Enqueues a future to be executed periodically. + * + * @param initialDelayMs delay until the future should be executed the first time + * @param periodMs delay between executions of the future + * @param future future to be enqueued + * @return the future + */ + private <T> ScheduledFuture<T> enqueuePeriodic(long initialDelayMs, long periodMs, + PseudoScheduledFuture<T> future) { + currentTime.enqueue(new PeriodicItem(currentTime, this, initialDelayMs, periodMs, future)); + return future; + } +} diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java new file mode 100644 index 00000000..6ce7bc04 --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.TimeUnit; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; + +/** + * Scheduled future that gets its time from an associated work item. + * + * @param <T> type of result returned by the future + */ +class PseudoScheduledFuture<T> extends FutureTask<T> implements RunnableScheduledFuture<T> { + + /** + * {@code True} if this task is periodic, {@code false} otherwise. + */ + private final boolean periodic; + + /** + * The work item with which this is associated. + */ + @Getter(AccessLevel.PROTECTED) + @Setter(AccessLevel.PROTECTED) + private WorkItem workItem; + + /** + * Constructs the object. + * + * @param runnable action to be executed + * @param result value to be returned by the {@link #get()} operation + * @param periodic {@code true} if this task is periodic, {@code false} otherwise + */ + public PseudoScheduledFuture(Runnable runnable, T result, boolean periodic) { + super(runnable, result); + this.periodic = periodic; + } + + /** + * Constructs the object. + * + * @param callable action to be executed + * @param periodic {@code true} if this task is periodic, {@code false} otherwise + */ + public PseudoScheduledFuture(Callable<T> callable, boolean periodic) { + super(callable); + this.periodic = periodic; + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(workItem.getDelay(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed other) { + return Long.compare(workItem.getDelay(), other.getDelay(TimeUnit.MILLISECONDS)); + } + + @Override + public boolean isPeriodic() { + return periodic; + } + + @Override + public void run() { + if (isPeriodic()) { + super.runAndReset(); + + } else { + super.run(); + } + } +} diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java new file mode 100644 index 00000000..e8de89a4 --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java @@ -0,0 +1,100 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import java.util.Date; +import java.util.Timer; +import java.util.TimerTask; + +/** + * A timer that uses {@link TestTimeMulti} to execute its tasks. + * + * <p/>Note: this only supports the run() method of {@link TimerTask}; the other methods, + * including cancel() are not supported. However, tasks may be canceled via + * {@link Timer#cancel()}. + * + * <p/>Currently, this does not support any of the scheduling methods that take dates, + * though that could be added relatively easily. + */ +public class PseudoTimer extends Timer { + private static final String NOT_IMPLEMENTED_YET = "not implemented yet"; + + /** + * Time with which this item is associated. + */ + private final TestTimeMulti currentTime; + + + /** + * Constructs the object. + * + * @param currentTime object to be used to execute timer tasks + */ + public PseudoTimer(TestTimeMulti currentTime) { + // create as a daemon so jvm doesn't hang when it attempts to exit + super(true); + + this.currentTime = currentTime; + + // don't need the timer's thread + super.cancel(); + } + + @Override + public void schedule(TimerTask task, long delayMs) { + currentTime.enqueue(new RunnableItem(currentTime, this, delayMs, task)); + } + + @Override + public void schedule(TimerTask task, Date time) { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public void schedule(TimerTask task, long delayMs, long periodMs) { + currentTime.enqueue(new PeriodicItem(currentTime, this, delayMs, periodMs, task)); + } + + @Override + public void schedule(TimerTask task, Date firstTime, long period) { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public void scheduleAtFixedRate(TimerTask task, long delayMs, long periodMs) { + currentTime.enqueue(new PeriodicItem(currentTime, this, delayMs, periodMs, task)); + } + + @Override + public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public void cancel() { + currentTime.cancelItems(this); + } + + @Override + public int purge() { + return 0; + } +} diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java new file mode 100644 index 00000000..54560316 --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java @@ -0,0 +1,91 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import java.util.concurrent.Future; +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.common.utils.time.TestTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Work item that may be run/executed. + */ +class RunnableItem extends WorkItem { + private static final Logger logger = LoggerFactory.getLogger(RunnableItem.class); + + /** + * Object with which this item is associated. + */ + @Getter(AccessLevel.PROTECTED) + private final Object associate; + + /** + * Action to execute. + */ + @Getter(AccessLevel.PROTECTED) + private final Runnable action; + + + /** + * Constructs the object. + * + * @param currentTime time with which this item is associated + * @param associate object with which this item is associated (e.g., Timer) + * @param delayMs time, in milliseconds, before this item should be executed + * @param action action to be performed + */ + public RunnableItem(TestTime currentTime, Object associate, long delayMs, Runnable action) { + super(currentTime, delayMs); + this.associate = associate; + this.action = action; + + // ensure the task can properly compute its delay + if (action instanceof PseudoScheduledFuture) { + ((PseudoScheduledFuture<?>) action).setWorkItem(this); + } + } + + @Override + public boolean isAssociatedWith(Object associate) { + return (this.associate == associate); + } + + @Override + public boolean wasCancelled() { + return (action instanceof Future && ((Future<?>) action).isCancelled()); + } + + @Override + public void fire() { + try { + action.run(); + } catch (RuntimeException e) { + logger.warn("work item {} threw an exception {}", this, e); + } + } + + @Override + public String toString() { + return "RunnableItem [nextMs=" + getNextMs() + ", associate=" + associate + "]"; + } +} diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java new file mode 100644 index 00000000..1d318803 --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import java.util.concurrent.CountDownLatch; + +/** + * Work item used when a thread invokes sleep(). The thread's "sleep()" method will + * enqueue this item and then invoke {@link #await()} to wait for the test/controlling + * thread to fire it, indicating that the end of the sleep time has been reached. + */ +public class SleepItem extends WorkItem { + /** + * Thread that invoked "sleep()". + */ + private final Thread thread; + + /** + * This will be decremented when this work item is fired, thus releasing the + * "sleeping" thread to continue its work. + */ + private final CountDownLatch latch = new CountDownLatch(1); + + + /** + * Constructs the object. + * + * @param currentTime time with which this item is associated + * @param sleepMs time for which the thread should sleep + * @param thread thread that invoked "sleep()" + */ + public SleepItem(TestTime currentTime, long sleepMs, Thread thread) { + super(currentTime, sleepMs); + this.thread = thread; + } + + @Override + public void interrupt() { + thread.interrupt(); + } + + @Override + public void fire() { + latch.countDown(); + } + + /** + * Waits for the sleep time to be reached. + * + * @throws InterruptedException if the current thread is interrupted + */ + public void await() throws InterruptedException { + latch.await(); + } + + @Override + public String toString() { + return "SleepItem [nextMs=" + getNextMs() + ", latch=" + latch + ", thread=" + thread + "]"; + } +} diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java index 414c18bb..420021f3 100644 --- a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * Common Utils-Test * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -37,7 +37,7 @@ public class TestTime extends CurrentTime { /** * Constructor. - * + * */ public TestTime() { super(); @@ -55,6 +55,8 @@ public class TestTime extends CurrentTime { @Override public void sleep(long sleepMs) throws InterruptedException { - tcur.addAndGet(sleepMs); + if (sleepMs > 0) { + tcur.addAndGet(sleepMs); + } } } diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java index b37e49e0..f52105ed 100644 --- a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java @@ -1,6 +1,6 @@ -/* +/*- * ============LICENSE_START======================================================= - * Common Utils-Test + * ONAP * ================================================================================ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ @@ -20,184 +20,326 @@ package org.onap.policy.common.utils.time; -import java.util.Date; +import static org.junit.Assert.fail; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.PriorityQueue; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import org.onap.policy.common.utils.time.TestTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * "Current" time, when running junit tests in multiple threads. This is intended to be * injected into classes under test, to replace their {@link CurrentTime} objects. The - * {@link #sleep(long)} method blocks until all threads enter and then it moves the notion - * of "current" time forward, allowing threads to resume, as the end of their sleep time - * is reached. Additional threads do not resume until all threads have once again entered - * {@link #sleep(long)} or when {@link #threadCompleted()} is invoked to indicate that a - * thread will not re-enter {@link #sleep(long)}. + * {@link #sleep(long)} method blocks until the "time" has reached the specified sleep + * time. A queue of work items is maintained, sorted by the time for which the items are + * scheduled to execute. Tasks are executed by the test/controlling thread when one of the + * waitXxx() methods is invoked. {@link PseudoTimer} and + * {@link PseudoScheduledExecutorService} add work items to the queue. + * + * <p/> + * This only handles relatively simple situations, though it does support multi-threaded + * testing. */ -public class TestTimeMulti extends CurrentTime { +public class TestTimeMulti extends TestTime { + private static final Logger logger = LoggerFactory.getLogger(TestTimeMulti.class); + + public static final String NEVER_SATISFIED = "condition was never satisfied"; + + /** + * Default value, in milliseconds, to wait for an item to be added to the queue. + */ + public static final long DEFAULT_MAX_WAIT_MS = 5000L; /** - * Number of threads that will be sleeping simultaneously. + * Maximum time that the test thread should wait for something to be added to its work + * queue. */ - private int nthreads; + @Getter + private final long maxWaitMs; /** - * "Current" time, in milliseconds, used by tests. + * Queue of timer tasks to be executed, sorted by {@link WorkItem#nextMs}. */ - private long tcur = System.currentTimeMillis(); + private final PriorityQueue<WorkItem> queue = + new PriorityQueue<>((item1, item2) -> Long.compare(item1.getNextMs(), item2.getNextMs())); /** - * Queue of sleeping threads waiting to be awakened. + * Lock used when modifying the queue. */ - private final PriorityQueue<Info> queue = new PriorityQueue<>(); + private final Object updateLock = new Object(); /** - * Used to synchronize updates. + * Constructs the object using the default maximum wait time. */ - private final Object locker = new Object(); + public TestTimeMulti() { + this(DEFAULT_MAX_WAIT_MS); + } /** - * Constructor. + * Constructs the object. * - * @param nthreads number of threads that will be sleeping simultaneously + * @param maxWaitMs maximum time that the test thread should wait for something to be + * added to its work queue */ - public TestTimeMulti(int nthreads) { - this.nthreads = nthreads; + public TestTimeMulti(long maxWaitMs) { + this.maxWaitMs = maxWaitMs; } - @Override - public long getMillis() { - return tcur; + /** + * Determines if the task queue is empty. + * + * @return {@code true} if the task queue is empty, {@code false} otherwise + */ + public boolean isEmpty() { + synchronized (updateLock) { + purgeItems(); + return queue.isEmpty(); + } } - @Override - public Date getDate() { - return new Date(tcur); + /** + * Gets the number of tasks in the work queue. + * + * @return the number of tasks in the work queue + */ + public int queueLength() { + synchronized (updateLock) { + purgeItems(); + return queue.size(); + } } - @Override - public void sleep(long sleepMs) throws InterruptedException { - if (sleepMs <= 0) { - return; + /** + * Indicates that this will no longer be used. Interrupts any threads that are waiting + * for their "sleep()" to complete. + */ + public void destroy() { + synchronized (updateLock) { + queue.forEach(WorkItem::interrupt); + queue.clear(); } + } - Info info = new Info(tcur + sleepMs); + /** + * Runs a single task from the queue. + * + * @param waitMs time, in milliseconds, for which to wait. This is "real" time rather + * than pseudo time + * + * @return {@code true} if a task was run, {@code false} if the queue was empty + * @throws InterruptedException if the current thread is interrupted + */ + public boolean runOneTask(long waitMs) throws InterruptedException { + WorkItem item = pollQueue(waitMs); + if (item == null) { + return false; + } - synchronized (locker) { - queue.add(info); + runItem(item); + return true; + } - if (queue.size() == nthreads) { - // all threads are now sleeping - wake one up - wakeThreads(); + /** + * Waits for the pseudo time to reach a certain point. Executes work items until the + * time is reached. + * + * @param waitMs pseudo time, in milliseconds, for which to wait + * @throws InterruptedException if the current thread is interrupted + */ + public void waitFor(long waitMs) throws InterruptedException { + // pseudo time for which we're waiting + long tend = getMillis() + waitMs; + + while (getMillis() < tend) { + if (!runOneTask(maxWaitMs)) { + /* + * Waited the maximum poll time and nothing has happened, so we'll just + * bump the time directly. + */ + super.sleep(tend - getMillis()); + break; } } - - // this MUST happen outside of the "synchronized" block - info.await(); } /** - * Indicates that a thread has terminated or that it will no longer be invoking - * {@link #sleep(long)}. Awakens the next sleeping thread, if the queue is full after - * removing the terminated thread. + * Waits for a condition to become true. Executes work items until the given condition + * is true. * - * @throws IllegalStateException if the queue is already full + * @param condition condition to be checked */ - public void threadCompleted() { - synchronized (locker) { - int sz = queue.size(); - if (sz >= nthreads) { - throw new IllegalStateException("too many threads still sleeping"); - } + public void waitUntil(Callable<Boolean> condition) { + try { + // real time for which we're waiting + long realEnd = System.currentTimeMillis() + maxWaitMs; - --nthreads; + while (System.currentTimeMillis() < realEnd) { + if (condition.call()) { + return; + } - if (sz == nthreads) { - // after removing terminated thread - queue is now full; awaken something - wakeThreads(); + runOneTask(100); } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("interrupted while waiting for condition", e); + fail("interrupted while waiting for condition: " + e.getMessage()); + + } catch (Exception e) { + logger.error("condition evaluator threw an exception", e); + fail("condition evaluator threw an exception: " + e.getMessage()); } + + fail(NEVER_SATISFIED); } /** - * Advances the "current" time and awakens any threads sleeping until that time. + * Waits for a condition to become true. Executes work items until the given condition + * is true or the maximum wait time is reached. + * + * @param twait maximum, pseudo time to wait + * @param units time units represented by "twait" + * @param condition condition to be checked */ - private void wakeThreads() { - Info info = queue.poll(); - if (info == null) { - return; - } + public void waitUntil(long twait, TimeUnit units, Callable<Boolean> condition) { + // pseudo time for which we're waiting + long tend = getMillis() + units.toMillis(twait); - tcur = info.getAwakenAtMs(); - info.wake(); + waitUntil(() -> { + if (getMillis() >= tend) { + fail(NEVER_SATISFIED); + } - while ((info = queue.poll()) != null) { - if (tcur == info.getAwakenAtMs()) { - info.wake(); + return condition.call(); + }); + } - } else { - // not ready to wake this thread - put it back in the queue - queue.add(info); - break; + /** + * Gets one item from the work queue. + * + * @param waitMs time, in milliseconds, for which to wait. This is "real" time rather + * than pseudo time + * @return the first item in the queue, or {@code null} if no item was added to the + * queue before the wait time expired + * @throws InterruptedException if the current thread was interrupted + */ + private WorkItem pollQueue(long waitMs) throws InterruptedException { + long realEnd = System.currentTimeMillis() + waitMs; + WorkItem work; + + synchronized (updateLock) { + while ((work = queue.poll()) == null) { + updateLock.wait(Math.max(1, realEnd - System.currentTimeMillis())); + + if (queue.isEmpty() && System.currentTimeMillis() >= realEnd) { + return null; + } } } + + return work; } /** - * Info about a sleeping thread. + * Runs a work item. + * + * @param work work item to be run + * @throws InterruptedException if the current thread was interrupted */ - private static class Info implements Comparable<Info> { - - /** - * Time, in milliseconds, at which the associated thread should awaken. - */ - private final long awakenAtMs; + private void runItem(WorkItem work) throws InterruptedException { + if (work.wasCancelled()) { + logger.info("work item was canceled {}", work); + return; + } - /** - * This is triggered when the associated thread should awaken. - */ - private final CountDownLatch latch = new CountDownLatch(1); + // update the pseudo time + super.sleep(work.getNextMs() - getMillis()); - /** - * Constructor. - * - * @param awakenAtMs time, in milliseconds, at which the associated thread should - * awaken + /* + * Add it back into the queue if appropriate, in case cancel() is called while + * it's executing. */ - public Info(long awakenAtMs) { - this.awakenAtMs = awakenAtMs; + if (work.bumpNextTime()) { + logger.info("re-enqueuing work item"); + enqueue(work); } - public long getAwakenAtMs() { - return awakenAtMs; + logger.info("fire work item {}", work); + work.fire(); + } + + @Override + public void sleep(long sleepMs) throws InterruptedException { + if (sleepMs <= 0) { + return; } - /** - * Awakens the associated thread by decrementing its latch. - */ - public void wake() { - latch.countDown(); + SleepItem item = new SleepItem(this, sleepMs, Thread.currentThread()); + enqueue(item); + + // wait for the item to fire + logger.info("sleeping {}", item); + item.await(); + logger.info("done sleeping {}", Thread.currentThread()); + } + + /** + * Adds an item to the {@link #queue}. + * + * @param item item to be added + */ + protected void enqueue(WorkItem item) { + logger.info("enqueue work item {}", item); + synchronized (updateLock) { + queue.add(item); + updateLock.notify(); } + } - /** - * Blocks the current thread until awakened (i.e., until its latch is - * decremented). - * - * @throws InterruptedException can be interrupted - */ - public void await() throws InterruptedException { - latch.await(); + /** + * Cancels work items by removing them from the queue if they're associated with the + * specified object. + * + * @param associate object whose associated items are to be cancelled + * @return list of items that were canceled + */ + protected List<WorkItem> cancelItems(Object associate) { + List<WorkItem> items = new LinkedList<>(); + + synchronized (updateLock) { + Iterator<WorkItem> iter = queue.iterator(); + while (iter.hasNext()) { + WorkItem item = iter.next(); + if (item.isAssociatedWith(associate)) { + iter.remove(); + items.add(item); + } + } } - @Override - public int compareTo(Info object) { - int diff = Long.compare(awakenAtMs, object.awakenAtMs); + return items; + } - // this assumes that Object.toString() is unique for each Info object - if (diff == 0) { - diff = this.toString().compareTo(object.toString()); + /** + * Purges work items that are known to have been canceled. (Does not remove canceled + * TimerTasks, as there is no way via the public API to determine if the task has been + * canceled.) + */ + public void purgeItems() { + synchronized (updateLock) { + Iterator<WorkItem> iter = queue.iterator(); + while (iter.hasNext()) { + if (iter.next().wasCancelled()) { + iter.remove(); + } } - return diff; } - } } diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java new file mode 100644 index 00000000..af3d5d7e --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java @@ -0,0 +1,128 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.common.utils.time.TestTime; + +/** + * Work item to be executed at some time. + */ +class WorkItem { + + /** + * Pseudo time with which this item is associated. + */ + private final TestTime currentTime; + + /** + * Time, in milliseconds, when the timer should fire next. + */ + @Getter(AccessLevel.PROTECTED) + private long nextMs; + + + /** + * Constructs the object. + * + * @param currentTime time with which this item is associated + * @param delayMs time, in milliseconds, before this item should be executed + */ + public WorkItem(TestTime currentTime, long delayMs) { + if (delayMs < 0) { + throw new IllegalArgumentException("invalid delay " + delayMs); + } + + this.currentTime = currentTime; + bumpNextTime(delayMs); + } + + /** + * Gets the delay until the item should be fired. + * + * @return the delay until the item should be fired + */ + public long getDelay() { + return (nextMs - currentTime.getMillis()); + } + + /** + * Determines if this work item was canceled. + * + * @return {@code true} if this item was canceled, {@code false} otherwise + */ + public boolean wasCancelled() { + return false; + } + + /** + * Bumps {@link #nextMs}, if this is a periodic task. The default method simply + * returns {@code false}. + * + * @return {@code true} if the time was bumped, {@code false} otherwise (i.e., it is + * not a periodic task) + */ + public boolean bumpNextTime() { + return false; + } + + /** + * Bumps {@link #nextMs}, setting it to the current time plus the given delay. + * + * @param delayMs time, in milliseconds, before this item should be (re-)executed + */ + protected void bumpNextTime(long delayMs) { + if (delayMs < 0) { + throw new IllegalArgumentException("negative delay"); + } + + // always bump by at least 1 millisecond + this.nextMs = currentTime.getMillis() + Math.max(1, delayMs); + } + + /** + * Interrupts the thread that created the work item, if appropriate. The default + * method does nothing. + */ + public void interrupt() { + // do nothing + } + + /** + * Determines if this item is associated with the given object. The default method + * simply returns {@code false}. + * + * @param associate candidate associate (e.g., Timer) + * @return {@code true} if the item is associated with the given object, {@code false} + * otherwise + */ + public boolean isAssociatedWith(Object associate) { + return false; + } + + /** + * Fires/executes this item. The default method does nothing. + */ + public void fire() { + // do nothing + } +} diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/PeriodicItemTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/PeriodicItemTest.java new file mode 100644 index 00000000..3e64edf3 --- /dev/null +++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/PeriodicItemTest.java @@ -0,0 +1,78 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import org.junit.Before; +import org.junit.Test; + +public class PeriodicItemTest { + private static final long DELAY_MS = 100L; + private static final long PERIOD_MS = 200L; + private static final Object ASSOCIATE = new Object(); + + private TestTime currentTime; + private int count; + private PeriodicItem item; + + /** + * Sets up objects, including {@link #item}. + */ + @Before + public void setUp() { + currentTime = new TestTime(); + count = 0; + item = new PeriodicItem(currentTime, ASSOCIATE, DELAY_MS, PERIOD_MS, () -> count++); + } + + @Test + public void testBumpNextTime() { + assertTrue(item.bumpNextTime()); + assertEquals(currentTime.getMillis() + PERIOD_MS, item.getNextMs()); + } + + @Test + public void testToString() { + assertNotNull(item.toString()); + } + + @Test + public void testPeriodicItem() { + assertSame(ASSOCIATE, item.getAssociate()); + assertNotNull(item.getAction()); + assertEquals(currentTime.getMillis() + DELAY_MS, item.getNextMs()); + + item.getAction().run(); + assertEquals(1, count); + + // invalid period + assertThatIllegalArgumentException() + .isThrownBy(() -> new PeriodicItem(currentTime, ASSOCIATE, DELAY_MS, 0, () -> count++)); + assertThatIllegalArgumentException() + .isThrownBy(() -> new PeriodicItem(currentTime, ASSOCIATE, DELAY_MS, -1, () -> count++)); + } + +} diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorServiceTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorServiceTest.java new file mode 100644 index 00000000..70820c44 --- /dev/null +++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorServiceTest.java @@ -0,0 +1,267 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; + +public class PseudoScheduledExecutorServiceTest { + private static final long DELAY_MS = 100L; + private static final long PERIOD_MS = 200L; + + private int ran; + private int called; + private TestTimeMulti currentTime; + private PseudoScheduledExecutorService svc; + + /** + * Sets up objects, including {@link #svc}. + */ + @Before + public void setUp() { + ran = 0; + called = 0; + currentTime = new TestTimeMulti(); + svc = new PseudoScheduledExecutorService(currentTime); + } + + @Test + public void testShutdown() { + // submit some tasks + svc.submit(new MyRun()); + svc.schedule(new MyRun(), 1L, TimeUnit.SECONDS); + + svc.shutdown(); + assertTrue(svc.isShutdown()); + + // task should have been removed + assertTrue(currentTime.isEmpty()); + } + + @Test + public void testShutdownNow() { + // submit some tasks + svc.submit(new MyRun()); + svc.schedule(new MyRun(), 1L, TimeUnit.SECONDS); + + svc.shutdownNow(); + assertTrue(svc.isShutdown()); + + // task should have been removed + assertTrue(currentTime.isEmpty()); + } + + @Test + public void testIsShutdown_testIsTerminated() { + assertFalse(svc.isShutdown()); + assertFalse(svc.isTerminated()); + + svc.shutdown(); + assertTrue(svc.isShutdown()); + assertTrue(svc.isTerminated()); + } + + @Test + public void testAwaitTermination() throws InterruptedException { + assertFalse(svc.awaitTermination(1L, TimeUnit.SECONDS)); + + svc.shutdown(); + assertTrue(svc.awaitTermination(1L, TimeUnit.SECONDS)); + } + + @Test + public void testSubmitCallableOfT() throws Exception { + Future<Integer> future = svc.submit(new MyCallable()); + currentTime.runOneTask(0); + + assertEquals(1, called); + assertEquals(1, future.get().intValue()); + + // nothing re-queued + assertTrue(currentTime.isEmpty()); + } + + @Test + public void testSubmitRunnableT() throws Exception { + Future<Integer> future = svc.submit(new MyRun(), 2); + currentTime.runOneTask(0); + + assertEquals(1, ran); + assertEquals(2, future.get().intValue()); + + // nothing re-queued + assertTrue(currentTime.isEmpty()); + } + + @Test + public void testSubmitRunnable() throws Exception { + assertNotNull(svc.submit(new MyRun())); + currentTime.runOneTask(0); + + assertEquals(1, ran); + + // nothing re-queued + assertTrue(currentTime.isEmpty()); + } + + @Test + public void testInvokeAllCollectionOfQextendsCallableOfT() { + assertThatThrownBy(() -> svc.invokeAll(Collections.emptyList())) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + public void testInvokeAllCollectionOfQextendsCallableOfTLongTimeUnit() { + assertThatThrownBy(() -> svc.invokeAll(Collections.emptyList(), 1, TimeUnit.MILLISECONDS)) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + public void testInvokeAnyCollectionOfQextendsCallableOfT() { + assertThatThrownBy(() -> svc.invokeAny(Collections.emptyList())) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + public void testInvokeAnyCollectionOfQextendsCallableOfTLongTimeUnit() { + assertThatThrownBy(() -> svc.invokeAny(Collections.emptyList(), 1, TimeUnit.MILLISECONDS)) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + public void testExecute() throws InterruptedException { + svc.execute(new MyRun()); + currentTime.runOneTask(0); + + assertEquals(1, ran); + + // nothing re-queued + assertTrue(currentTime.isEmpty()); + } + + @Test + public void testScheduleRunnableLongTimeUnit() throws InterruptedException { + assertNotNull(svc.schedule(new MyRun(), DELAY_MS, TimeUnit.MILLISECONDS)); + + assertEquals(DELAY_MS, oneTaskElapsedTime()); + assertEquals(1, ran); + + // verify nothing re-scheduled + assertTrue(currentTime.isEmpty()); + } + + @Test + public void testScheduleCallableOfVLongTimeUnit() throws Exception { + ScheduledFuture<Integer> future = svc.schedule(new MyCallable(), DELAY_MS, TimeUnit.MILLISECONDS); + + assertEquals(DELAY_MS, oneTaskElapsedTime()); + assertEquals(1, called); + assertEquals(1, future.get().intValue()); + + // verify nothing re-scheduled + assertTrue(currentTime.isEmpty()); + } + + @Test + public void testScheduleAtFixedRate() throws InterruptedException { + final ScheduledFuture<?> future = + svc.scheduleAtFixedRate(new MyRun(), DELAY_MS, PERIOD_MS, TimeUnit.MILLISECONDS); + + assertEquals(DELAY_MS, oneTaskElapsedTime()); + assertEquals(1, ran); + + assertEquals(PERIOD_MS, oneTaskElapsedTime()); + assertEquals(2, ran); + + assertEquals(PERIOD_MS, oneTaskElapsedTime()); + assertEquals(3, ran); + + future.cancel(false); + + // should not actually execute + assertEquals(0, oneTaskElapsedTime()); + assertEquals(3, ran); + + // verify nothing re-scheduled + assertTrue(currentTime.isEmpty()); + } + + @Test + public void testScheduleWithFixedDelay() throws InterruptedException { + final ScheduledFuture<?> future = + svc.scheduleWithFixedDelay(new MyRun(), DELAY_MS, PERIOD_MS, TimeUnit.MILLISECONDS); + + assertEquals(DELAY_MS, oneTaskElapsedTime()); + assertEquals(1, ran); + + assertEquals(PERIOD_MS, oneTaskElapsedTime()); + assertEquals(2, ran); + + assertEquals(PERIOD_MS, oneTaskElapsedTime()); + assertEquals(3, ran); + + future.cancel(false); + + // should not actually execute + assertEquals(0, oneTaskElapsedTime()); + assertEquals(3, ran); + + // verify nothing re-scheduled + assertTrue(currentTime.isEmpty()); + } + + /** + * Runs a single task and returns its elapsed (pseudo) time. + * + * @return the elapsed time taken to run the task + * @throws InterruptedException if the thread is interrupted + */ + private long oneTaskElapsedTime() throws InterruptedException { + final long tbegin = currentTime.getMillis(); + currentTime.runOneTask(0); + return (currentTime.getMillis() - tbegin); + } + + private class MyRun implements Runnable { + @Override + public void run() { + ++ran; + } + } + + private class MyCallable implements Callable<Integer> { + @Override + public Integer call() { + return ++called; + } + } +} diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledFutureTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledFutureTest.java new file mode 100644 index 00000000..e23bbd29 --- /dev/null +++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledFutureTest.java @@ -0,0 +1,145 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class PseudoScheduledFutureTest { + private static final long DELAY_MS = 1000L; + + private int count; + + @Mock + private WorkItem work; + + private PseudoScheduledFuture<Integer> future; + + /** + * Sets up objects, including {@link #future}. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + when(work.getDelay()).thenReturn(DELAY_MS); + + count = 0; + future = new PseudoScheduledFuture<>(() -> ++count, true); + future.setWorkItem(work); + } + + @Test + public void testRun() { + // verify with a periodic task - should execute twice + count = 0; + future.run(); + future.run(); + assertEquals(2, count); + + // verify with an aperiodic task - should only execute once + future = new PseudoScheduledFuture<>(() -> ++count, false); + count = 0; + future.run(); + future.run(); + assertEquals(1, count); + } + + @Test + public void testPseudoScheduledFutureRunnableTBoolean() throws Exception { + final Integer result = 100; + future = new PseudoScheduledFuture<>(() -> ++count, result, true); + assertTrue(future.isPeriodic()); + future.run(); + future.run(); + assertEquals(2, count); + + // verify with aperiodic constructor + future = new PseudoScheduledFuture<>(() -> ++count, result, false); + count = 0; + assertFalse(future.isPeriodic()); + future.run(); + future.run(); + assertEquals(1, count); + assertEquals(result, future.get()); + } + + @Test + public void testPseudoScheduledFutureCallableOfTBoolean() throws Exception { + assertTrue(future.isPeriodic()); + future.run(); + future.run(); + assertEquals(2, count); + + // verify with aperiodic constructor + future = new PseudoScheduledFuture<>(() -> ++count, false); + count = 0; + assertFalse(future.isPeriodic()); + future.run(); + assertEquals(1, future.get().intValue()); + future.run(); + assertEquals(1, count); + } + + @Test + public void testGetDelay() { + assertEquals(DELAY_MS, future.getDelay(TimeUnit.MILLISECONDS)); + assertEquals(TimeUnit.MILLISECONDS.toSeconds(DELAY_MS), future.getDelay(TimeUnit.SECONDS)); + } + + @Test + public void testCompareTo() { + Delayed delayed = mock(Delayed.class); + when(delayed.getDelay(TimeUnit.MILLISECONDS)).thenReturn(DELAY_MS + 1); + + assertTrue(future.compareTo(delayed) < 0); + } + + @Test + public void testIsPeriodic() { + assertTrue(future.isPeriodic()); + assertFalse(new PseudoScheduledFuture<>(() -> ++count, false).isPeriodic()); + } + + @Test + public void testGetWorkItem() { + assertSame(work, future.getWorkItem()); + } + + @Test + public void testSetWorkItem() { + work = mock(WorkItem.class); + future.setWorkItem(work); + assertSame(work, future.getWorkItem()); + } + +} diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoTimerTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoTimerTest.java new file mode 100644 index 00000000..49710538 --- /dev/null +++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoTimerTest.java @@ -0,0 +1,141 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Date; +import java.util.TimerTask; +import org.junit.Before; +import org.junit.Test; + +public class PseudoTimerTest { + private static final long DELAY_MS = 1000L; + private static final long PERIOD_MS = 2000L; + + private int count; + private TestTimeMulti currentTime; + private PseudoTimer timer; + + /** + * Sets up objects, including {@link #timer}. + */ + @Before + public void setUp() { + count = 0; + currentTime = new TestTimeMulti(); + timer = new PseudoTimer(currentTime); + } + + @Test + public void testCancel() { + // schedule two tasks + timer.scheduleAtFixedRate(new MyTask(), DELAY_MS, PERIOD_MS); + timer.schedule(new MyTask(), DELAY_MS); + + assertFalse(currentTime.isEmpty()); + + // cancel the timer + timer.cancel(); + + // invoke it again to ensure no exception + timer.cancel(); + } + + @Test + public void testPurge() { + assertEquals(0, timer.purge()); + assertEquals(0, timer.purge()); + } + + @Test + public void testScheduleTimerTaskLong() throws InterruptedException { + timer.schedule(new MyTask(), DELAY_MS); + assertFalse(currentTime.isEmpty()); + + // wait for the initial delay + currentTime.waitFor(DELAY_MS); + assertEquals(1, count); + + assertTrue(currentTime.isEmpty()); + } + + @Test + public void testScheduleTimerTaskDate() { + assertThatThrownBy(() -> timer.schedule(new MyTask(), new Date())) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + public void testScheduleTimerTaskLongLong() throws InterruptedException { + timer.schedule(new MyTask(), DELAY_MS, PERIOD_MS); + assertFalse(currentTime.isEmpty()); + + // wait for the initial delay plus a couple of additional periods + final long tbegin = System.currentTimeMillis(); + currentTime.waitFor(DELAY_MS + PERIOD_MS * 2); + assertTrue(count >= 3); + + assertFalse(currentTime.isEmpty()); + + // this thread should not have blocked while waiting + assertTrue(System.currentTimeMillis() < tbegin + 2000); + } + + @Test + public void testScheduleTimerTaskDateLong() { + assertThatThrownBy(() -> timer.schedule(new MyTask(), new Date(), 1L)) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + public void testScheduleAtFixedRateTimerTaskLongLong() throws InterruptedException { + timer.scheduleAtFixedRate(new MyTask(), DELAY_MS, PERIOD_MS); + assertFalse(currentTime.isEmpty()); + + // wait for the initial delay plus a couple of additional periods + final long tbegin = System.currentTimeMillis(); + currentTime.waitFor(DELAY_MS + PERIOD_MS * 2); + assertTrue(count >= 3); + + assertFalse(currentTime.isEmpty()); + + // this thread should not have blocked while waiting + assertTrue(System.currentTimeMillis() < tbegin + 2000); + } + + @Test + public void testScheduleAtFixedRateTimerTaskDateLong() { + assertThatThrownBy(() -> timer.scheduleAtFixedRate(new MyTask(), new Date(), 1L)) + .isInstanceOf(UnsupportedOperationException.class); + } + + private class MyTask extends TimerTask { + @Override + public void run() { + ++count; + } + + } +} diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/RunnableItemTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/RunnableItemTest.java new file mode 100644 index 00000000..e7bbd018 --- /dev/null +++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/RunnableItemTest.java @@ -0,0 +1,102 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.FutureTask; +import org.junit.Before; +import org.junit.Test; + +public class RunnableItemTest { + private static final long DELAY_MS = 100L; + private static final Object ASSOCIATE = new Object(); + + private TestTime currentTime; + private int count; + private RunnableItem item; + + /** + * Sets up objects, including {@link #item}. + */ + @Before + public void setUp() { + currentTime = new TestTime(); + count = 0; + item = new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, () -> count++); + } + + @Test + public void testWasCancelled() { + assertFalse(item.wasCancelled()); + + FutureTask<Object> future = new FutureTask<>(() -> count++); + item = new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, future); + assertFalse(item.wasCancelled()); + + future.cancel(true); + assertTrue(item.wasCancelled()); + } + + @Test + public void testIsAssociatedWith() { + assertFalse(item.isAssociatedWith(this)); + assertTrue(item.isAssociatedWith(ASSOCIATE)); + } + + @Test + public void testFire() { + item.fire(); + assertEquals(1, count); + + // verify that fire() works even if the action throws an exception + new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, () -> { + throw new RuntimeException("expected exception"); + }).fire(); + } + + @Test + public void testRunnableItem_testGetAssociate_testGetAction() { + assertSame(ASSOCIATE, item.getAssociate()); + assertNotNull(item.getAction()); + assertEquals(currentTime.getMillis() + DELAY_MS, item.getNextMs()); + + item.getAction().run(); + assertEquals(1, count); + + // verify that work item is set when constructed with a future + PseudoScheduledFuture<Integer> schedFuture = new PseudoScheduledFuture<>(() -> count + 1, false); + item = new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, schedFuture); + assertSame(item, schedFuture.getWorkItem()); + + // verify that work item is NOT set when constructed with a plain future + item = new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, new FutureTask<>(() -> count + 1)); + } + + @Test + public void testToString() { + assertNotNull(item.toString()); + } +} diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/SleepItemTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/SleepItemTest.java new file mode 100644 index 00000000..dbd54781 --- /dev/null +++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/SleepItemTest.java @@ -0,0 +1,112 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; + +public class SleepItemTest { + private static final int SLEEP_MS = 250; + private static final long MAX_WAIT_MS = 5000L; + + private TestTime currentTime; + private Thread thread; + private CountDownLatch started; + private CountDownLatch finished; + private volatile InterruptedException threadEx; + private SleepItem item; + + /** + * Sets up objects, including {@link #item}. + */ + @Before + public void setUp() { + currentTime = new TestTime(); + started = new CountDownLatch(1); + finished = new CountDownLatch(1); + + thread = new Thread() { + @Override + public void run() { + try { + started.countDown(); + item.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + threadEx = e; + } + + finished.countDown(); + } + }; + thread.setDaemon(true); + + item = new SleepItem(currentTime, SLEEP_MS, thread); + } + + @Test + public void testInterrupt() throws InterruptedException { + startThread(); + + item.interrupt(); + + assertTrue(finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS)); + assertNotNull(threadEx); + } + + @Test + public void testFire_testAwait() throws InterruptedException { + startThread(); + + // verify that it hasn't finished yet + thread.join(250); + assertTrue(finished.getCount() > 0); + + // now fire it and verify that it finishes + item.fire(); + assertTrue(finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS)); + + assertNull(threadEx); + } + + @Test + public void testSleepItem() { + assertEquals(currentTime.getMillis() + SLEEP_MS, item.getNextMs()); + } + + @Test + public void testToString() { + assertNotNull(item.toString()); + } + + + private void startThread() throws InterruptedException { + thread.start(); + started.await(); + } +} diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java index f17235a2..8b6501a7 100644 --- a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java +++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java @@ -1,6 +1,6 @@ -/* +/*- * ============LICENSE_START======================================================= - * Common Utils-Test + * ONAP * ================================================================================ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ @@ -20,109 +20,478 @@ package org.onap.policy.common.utils.time; -import static org.assertj.core.api.Assertions.assertThatIllegalStateException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Semaphore; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Before; import org.junit.Test; -/** - * Class to test TestTimeMulti. - */ public class TestTimeMultiTest { + private static final long SHORT_WAIT_MS = 100L; + private static final long DELAY_MS = 500L; + private static final long MAX_WAIT_MS = 5000L; - private static final int NTHREADS = 10; - private static final int NTIMES = 100; - private static final long WAIT_SEC = 5L; - private static final long MIN_SLEEP_MS = 5L; + private TestTimeMulti multi; - private TestTimeMulti ttm; - private Semaphore done; + @Before + public void setUp() { + multi = new TestTimeMulti(); + } @Test - public void test() throws Exception { - ttm = new TestTimeMulti(NTHREADS); - done = new Semaphore(0); + public void testSleep() throws InterruptedException { + // negative sleep time + final long tbegin = multi.getMillis(); + MyThread thread = new MyThread(-5); + thread.start(); - final long tbeg = ttm.getMillis(); + // should complete without creating a work item + assertTrue(thread.await()); + assertNull(thread.ex); - // create threads - List<MyThread> threads = new ArrayList<>(NTHREADS); - for (int x = 0; x < NTHREADS; ++x) { - threads.add(new MyThread(x + MIN_SLEEP_MS)); - } + // time should not have changed + assertEquals(tbegin, multi.getMillis()); + + + // positive sleep time + thread = new MyThread(DELAY_MS); + thread.start(); + + // must execute the SleepItem + multi.runOneTask(MAX_WAIT_MS); + + assertTrue(multi.isEmpty()); + assertTrue(thread.await()); + assertNull(thread.ex); + + // time SHOULD HAVE changed + assertEquals(tbegin + DELAY_MS, multi.getMillis()); + } + + @Test + public void testTestTimeMulti() { + assertTrue(multi.getMaxWaitMs() > 0); + } + + @Test + public void testTestTimeMultiLong() { + assertEquals(200, new TestTimeMulti(200).getMaxWaitMs()); + } + + @Test + public void testIsEmpty_testQueueLength() throws InterruptedException { + assertTrue(multi.isEmpty()); + + // queue up two items + multi.enqueue(new WorkItem(multi, DELAY_MS)); + assertFalse(multi.isEmpty()); + assertEquals(1, multi.queueLength()); + + multi.enqueue(new WorkItem(multi, DELAY_MS)); + assertEquals(2, multi.queueLength()); + + // run one - should not be empty yet + multi.runOneTask(0); + assertFalse(multi.isEmpty()); + assertEquals(1, multi.queueLength()); + + // run the other - should be empty now + multi.runOneTask(0); + assertTrue(multi.isEmpty()); + assertEquals(0, multi.queueLength()); + } + + @Test + public void testDestroy() throws InterruptedException { + // this won't interrupt + multi.enqueue(new WorkItem(multi, DELAY_MS)); + + // these will interrupt + AtomicBoolean interrupted1 = new AtomicBoolean(false); + multi.enqueue(new WorkItem(multi, DELAY_MS) { + @Override + public void interrupt() { + interrupted1.set(true); + } + }); + + AtomicBoolean interrupted2 = new AtomicBoolean(false); + multi.enqueue(new WorkItem(multi, DELAY_MS) { + @Override + public void interrupt() { + interrupted2.set(true); + } + }); + + multi.destroy(); + assertTrue(multi.isEmpty()); + + assertTrue(interrupted1.get()); + assertTrue(interrupted2.get()); + } + + @Test + public void testRunOneTask() throws InterruptedException { + // nothing in the queue yet + assertFalse(multi.runOneTask(0)); + + // put something in the queue + multi.enqueue(new WorkItem(multi, DELAY_MS)); + + final long tbegin = multi.getMillis(); + assertTrue(multi.runOneTask(MAX_WAIT_MS)); + + assertEquals(tbegin + DELAY_MS, multi.getMillis()); + + // nothing in the queue now + assertFalse(multi.runOneTask(0)); + + // time doesn't change + assertEquals(tbegin + DELAY_MS, multi.getMillis()); + } + + @Test + public void testWaitFor() throws InterruptedException { + // queue up a couple of items + multi.enqueue(new WorkItem(multi, DELAY_MS)); + multi.enqueue(new WorkItem(multi, DELAY_MS * 2)); + multi.enqueue(new WorkItem(multi, DELAY_MS * 3)); + + final long realBegin = System.currentTimeMillis(); + final long tbegin = multi.getMillis(); + multi.waitFor(DELAY_MS * 2 - 1); + assertEquals(tbegin + DELAY_MS * 2, multi.getMillis()); + + // minimal real time should have elapsed + assertTrue(System.currentTimeMillis() < realBegin + TestTimeMulti.DEFAULT_MAX_WAIT_MS); + } + + @Test + public void testWaitFor_EmptyQueue() throws InterruptedException { + multi = new TestTimeMulti(SHORT_WAIT_MS); + + final long realBegin = System.currentTimeMillis(); + final long tbegin = multi.getMillis(); + + multi.waitFor(2); + + assertEquals(tbegin + 2, multi.getMillis()); + assertTrue(System.currentTimeMillis() >= realBegin + SHORT_WAIT_MS); + } + + @Test + public void testWaitUntilCallable() throws InterruptedException { + multi.enqueue(new WorkItem(multi, DELAY_MS)); + multi.enqueue(new WorkItem(multi, DELAY_MS * 2)); + multi.enqueue(new WorkItem(multi, DELAY_MS * 3)); + + final long tbegin = multi.getMillis(); + AtomicInteger count = new AtomicInteger(0); + multi.waitUntil(() -> count.incrementAndGet() == 3); + + assertEquals(tbegin + DELAY_MS * 2, multi.getMillis()); + + // should still be one item left in the queue + assertEquals(1, multi.queueLength()); + assertEquals(3, count.get()); + } + + @Test + public void testWaitUntilCallable_InterruptEx() throws InterruptedException { + multi = new TestTimeMulti(); + + Callable<Boolean> callable = () -> { + throw new InterruptedException("expected exception"); + }; + + LinkedBlockingQueue<Error> errors = new LinkedBlockingQueue<>(); + + Thread thread = new Thread() { + @Override + public void run() { + try { + multi.waitUntil(callable); + } catch (Error ex) { + errors.add(ex); + } + } + }; + + thread.start(); + + Error ex = errors.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS); + assertNotNull(ex); + assertEquals("interrupted while waiting for condition: expected exception", ex.getMessage()); + } + + @Test + public void testWaitUntilCallable_ConditionThrowsEx() throws InterruptedException { + multi = new TestTimeMulti(); + + Callable<Boolean> callable = () -> { + throw new IllegalStateException("expected exception"); + }; + + final long realBegin = System.currentTimeMillis(); + assertThatThrownBy(() -> multi.waitUntil(callable)) + .hasMessage("condition evaluator threw an exception: expected exception"); + + assertTrue(System.currentTimeMillis() < realBegin + TestTimeMulti.DEFAULT_MAX_WAIT_MS); + } + + @Test + public void testWaitUntilCallable_NeverSatisfied() throws InterruptedException { + multi = new TestTimeMulti(SHORT_WAIT_MS); + + final long realBegin = System.currentTimeMillis(); + assertThatThrownBy(() -> multi.waitUntil(() -> false)) + .hasMessage(TestTimeMulti.NEVER_SATISFIED); + assertTrue(System.currentTimeMillis() >= realBegin + SHORT_WAIT_MS); + } - // launch threads - for (MyThread thr : threads) { - thr.start(); + @Test + public void testWaitUntilLongTimeUnitCallable() throws InterruptedException { + multi.enqueue(new WorkItem(multi, DELAY_MS)); + multi.enqueue(new WorkItem(multi, DELAY_MS * 2)); + multi.enqueue(new WorkItem(multi, DELAY_MS * 3)); + + final long tbegin = multi.getMillis(); + AtomicInteger count = new AtomicInteger(0); + multi.waitUntil(DELAY_MS * 4, TimeUnit.MILLISECONDS, () -> count.incrementAndGet() == 3); + + assertEquals(tbegin + DELAY_MS * 2, multi.getMillis()); + + // should still be one item left in the queue + assertEquals(1, multi.queueLength()); + assertEquals(3, count.get()); + } + + @Test + public void testWaitUntilLongTimeUnitCallable_PseudoTimeExpires() throws InterruptedException { + multi.enqueue(new WorkItem(multi, DELAY_MS)); + multi.enqueue(new WorkItem(multi, DELAY_MS * 2)); + multi.enqueue(new WorkItem(multi, DELAY_MS * 3)); + + final long tbegin = multi.getMillis(); + assertThatThrownBy(() -> multi.waitUntil(DELAY_MS * 2 - 1, TimeUnit.MILLISECONDS, () -> false)) + .hasMessage(TestTimeMulti.NEVER_SATISFIED); + assertEquals(tbegin + DELAY_MS * 2, multi.getMillis()); + } + + @Test + public void testRunItem() throws InterruptedException { + AtomicBoolean fired = new AtomicBoolean(false); + multi.enqueue(new MyWorkItem(fired)); + + assertTrue(multi.runOneTask(1)); + + // should no longer be in the queue + assertTrue(multi.isEmpty()); + + // should have been fired + assertTrue(fired.get()); + } + + @Test + public void testRunItem_Rescheduled() throws InterruptedException { + AtomicBoolean fired = new AtomicBoolean(false); + + multi.enqueue(new MyWorkItem(fired) { + @Override + public boolean bumpNextTime() { + bumpNextTime(DELAY_MS); + return true; + } + }); + + assertTrue(multi.runOneTask(1)); + + // should still be in the queue + assertEquals(1, multi.queueLength()); + + // should have been fired + assertTrue(fired.get()); + } + + @Test + public void testRunItem_Canceled() throws InterruptedException { + AtomicBoolean fired = new AtomicBoolean(false); + + multi.enqueue(new MyWorkItem(fired) { + @Override + public boolean wasCancelled() { + return true; + } + + @Override + public boolean bumpNextTime() { + return true; + } + }); + + final long tbegin = multi.getMillis(); + assertTrue(multi.runOneTask(1)); + + // time should be unchanged + assertEquals(tbegin, multi.getMillis()); + + assertTrue(multi.isEmpty()); + + // should not have been fired + assertFalse(fired.get()); + } + + @Test + public void testEnqueue() throws InterruptedException { + CountDownLatch started = new CountDownLatch(1); + CountDownLatch finished = new CountDownLatch(1); + AtomicReference<InterruptedException> ex = new AtomicReference<>(); + + Thread thread = new Thread() { + @Override + public void run() { + started.countDown(); + + try { + multi.runOneTask(DELAY_MS * 3); + } catch (InterruptedException e) { + ex.set(e); + } + + finished.countDown(); + } + }; + + thread.start(); + + // wait for thread to start + started.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS); + + // wait for it to block on the lock + await().atMost(MAX_WAIT_MS, TimeUnit.MILLISECONDS).until(() -> thread.getState() == Thread.State.TIMED_WAITING); + + // add an item to the queue - should trigger the thread to continue + multi.enqueue(new WorkItem(multi, DELAY_MS)); + + assertTrue(finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS)); + assertNull(ex.get()); + } + + @Test + public void testCancelItems() throws InterruptedException { + AtomicBoolean fired1 = new AtomicBoolean(); + multi.enqueue(new MyWorkItem(fired1)); + + AtomicBoolean fired2 = new AtomicBoolean(); + multi.enqueue(new MyWorkItem(fired2)); + multi.enqueue(new MyWorkItem(fired2)); + + AtomicBoolean fired3 = new AtomicBoolean(); + multi.enqueue(new MyWorkItem(fired3)); + + // cancel some + multi.cancelItems(fired2); + + // should have only canceled two of them + assertEquals(2, multi.queueLength()); + + // fire both + multi.runOneTask(0); + multi.runOneTask(0); + + // these should have fired + assertTrue(fired1.get()); + assertTrue(fired3.get()); + + // these should NOT have fired + assertFalse(fired2.get()); + } + + @Test + public void testPurgeItems() throws InterruptedException { + AtomicBoolean fired = new AtomicBoolean(); + + // queue up two that are canceled, one that is not + multi.enqueue(new MyWorkItem(true)); + multi.enqueue(new MyWorkItem(fired)); + multi.enqueue(new MyWorkItem(true)); + + multi.purgeItems(); + + assertEquals(1, multi.queueLength()); + + multi.runOneTask(0); + assertTrue(fired.get()); + } + + private class MyWorkItem extends WorkItem { + private final AtomicBoolean fired; + private final boolean canceled; + + public MyWorkItem(AtomicBoolean fired) { + super(multi, DELAY_MS); + this.fired = fired; + this.canceled = false; } - // wait for each one to complete - for (MyThread thr : threads) { - assertTrue("complete " + thr.getSleepMs(), done.tryAcquire(WAIT_SEC, TimeUnit.SECONDS)); - ttm.threadCompleted(); + public MyWorkItem(boolean canceled) { + super(multi, DELAY_MS); + this.fired = new AtomicBoolean(); + this.canceled = canceled; } - // check results - for (MyThread thr : threads) { - assertEquals("time " + thr.getSleepMs(), thr.texpected, thr.tactual); + @Override + public void fire() { + fired.set(true); } - assertTrue(ttm.getMillis() >= tbeg + NTIMES * MIN_SLEEP_MS); + @Override + public boolean isAssociatedWith(Object associate) { + return (fired == associate); + } - // something in the queue, but no threads remain -> exception - assertThatIllegalStateException().isThrownBy(() -> ttm.threadCompleted()); + @Override + public boolean wasCancelled() { + return canceled; + } } private class MyThread extends Thread { - private final long sleepMs; - - private volatile long texpected; - private volatile long tactual; + private final CountDownLatch finished = new CountDownLatch(1); + private InterruptedException ex = null; public MyThread(long sleepMs) { this.sleepMs = sleepMs; - this.setDaemon(true); } - public long getSleepMs() { - return sleepMs; + public boolean await() throws InterruptedException { + return finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS); } @Override public void run() { try { - for (int x = 0; x < NTIMES; ++x) { - // negative sleep should have no effect - texpected = ttm.getMillis(); - ttm.sleep(-1); - if ((tactual = ttm.getMillis()) != texpected) { - break; - } - - texpected = ttm.getMillis() + sleepMs; - ttm.sleep(sleepMs); - - if ((tactual = ttm.getMillis()) != texpected) { - break; - } - - if ((tactual = ttm.getDate().getTime()) != texpected) { - break; - } - } - - } catch (InterruptedException expected) { + multi.sleep(sleepMs); + } catch (InterruptedException e) { Thread.currentThread().interrupt(); + ex = e; } - done.release(); + finished.countDown(); } } } diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java index 3e7897e9..d2cf6783 100644 --- a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java +++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * Common Utils-Test * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -62,6 +62,11 @@ public class TestTimeTest { // ensure that no real time has elapsed assertTrue(System.currentTimeMillis() < treal + tsleep / 2); + + // negative sleep should not modify the time + tcur = tm.getMillis(); + tm.sleep(-1); + assertEquals(tcur, tm.getMillis()); } } diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/WorkItemTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/WorkItemTest.java new file mode 100644 index 00000000..4e6f92b5 --- /dev/null +++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/WorkItemTest.java @@ -0,0 +1,100 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import org.junit.Before; +import org.junit.Test; + +public class WorkItemTest { + private TestTime currentTime; + private WorkItem item; + + @Before + public void setUp() { + currentTime = new TestTime(); + item = new WorkItem(currentTime, 0); + } + + @Test + public void testWorkItem() { + assertThatIllegalArgumentException().isThrownBy(() -> new WorkItem(currentTime, -1)); + + // should not throw an exception + new WorkItem(currentTime, 1); + } + + @Test + public void testGetDelay() { + assertEquals(1, item.getDelay()); + } + + @Test + public void testWasCancelled() { + assertFalse(item.wasCancelled()); + } + + @Test + public void testBumpNextTime() { + assertFalse(item.bumpNextTime()); + } + + @Test + public void testBumpNextTimeLong() { + assertThatIllegalArgumentException().isThrownBy(() -> item.bumpNextTime(-1)); + + long cur = currentTime.getMillis(); + item.bumpNextTime(5); + assertEquals(cur + 5, item.getNextMs()); + + item.bumpNextTime(0); + + // should bump the time by at least 1 + assertEquals(cur + 1, item.getNextMs()); + } + + @Test + public void testInterrupt() { + item.interrupt(); + assertFalse(Thread.interrupted()); + } + + @Test + public void testIsAssociatedWith() { + assertFalse(item.isAssociatedWith(this)); + } + + @Test + public void testFire() { + // ensure no exception is thrown + item.fire(); + } + + @Test + public void testGetNextMs() { + assertEquals(currentTime.getMillis() + 1, item.getNextMs()); + assertEquals(currentTime.getMillis() + 10, new WorkItem(currentTime, 10).getNextMs()); + } + +} |