From cf0cd76d63f9eba680a6307dd4a708e7169cb403 Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Mon, 19 Aug 2019 17:32:09 -0400 Subject: Enhance TestTimeMulti Enhance TestTimeMulti to support execution of tasks, whether submitted via Timers or via Executors. Change-Id: Ib5f216730b3b69028e9581052645370b827cd446 Issue-ID: POLICY-1968 Signed-off-by: Jim Hahn --- .../policy/common/utils/time/PeriodicItem.java | 65 ++++ .../utils/time/PseudoScheduledExecutorService.java | 186 +++++++++++ .../common/utils/time/PseudoScheduledFuture.java | 98 ++++++ .../onap/policy/common/utils/time/PseudoTimer.java | 100 ++++++ .../policy/common/utils/time/RunnableItem.java | 91 ++++++ .../onap/policy/common/utils/time/SleepItem.java | 78 +++++ .../onap/policy/common/utils/time/TestTime.java | 12 +- .../policy/common/utils/time/TestTimeMulti.java | 364 ++++++++++++++------- .../onap/policy/common/utils/time/WorkItem.java | 128 ++++++++ 9 files changed, 1006 insertions(+), 116 deletions(-) create mode 100644 utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java create mode 100644 utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java create mode 100644 utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java create mode 100644 utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java create mode 100644 utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java create mode 100644 utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java create mode 100644 utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java (limited to 'utils-test/src/main/java') diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java new file mode 100644 index 00000000..79d2f226 --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import org.onap.policy.common.utils.time.TestTime; + +/** + * Work item that runs periodically. + */ +class PeriodicItem extends RunnableItem { + + /** + * Time, in milliseconds, to wait between executions. + */ + private final long periodMs; + + + /** + * Constructs the object. + * + * @param currentTime time with which this item is associated + * @param associate object with which this item is associated (e.g., Timer) + * @param delayMs time, in milliseconds, before this item should be executed + * @param periodMs time, in milliseconds, to delay between each execution + * @param action action to be performed + */ + public PeriodicItem(TestTime currentTime, Object associate, long delayMs, long periodMs, Runnable action) { + super(currentTime, associate, delayMs, action); + + if (periodMs <= 0) { + throw new IllegalArgumentException("invalid period " + periodMs); + } + + this.periodMs = periodMs; + } + + @Override + public boolean bumpNextTime() { + bumpNextTime(periodMs); + return true; + } + + @Override + public String toString() { + return "PeriodicItem [nextMs=" + getNextMs() + ", periodMs=" + periodMs + ", associate=" + getAssociate() + "]"; + } +} diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java new file mode 100644 index 00000000..4f9b32c9 --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java @@ -0,0 +1,186 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * Scheduled executor service that uses {@link TestTimeMulti} to execute its tasks. Note: + * the invokeXxx() methods are not currently supported. + */ +public class PseudoScheduledExecutorService implements ScheduledExecutorService { + private static final String NOT_IMPLEMENTED_YET = "not implemented yet"; + + /** + * Object to be used to execute timer tasks. + */ + private final TestTimeMulti currentTime; + + /** + * {@code True} if {@link #shutdown()} or {@link #shutdownNow()} has been called, + * {@code false} otherwise. + */ + private boolean shutdown = false; + + /** + * Constructs the object. + * + * @param currentTime object to be used to execute timer tasks + */ + public PseudoScheduledExecutorService(TestTimeMulti currentTime) { + this.currentTime = currentTime; + } + + /** + * Cancels all tasks that have not yet been executed. + */ + @Override + public void shutdown() { + shutdown = true; + currentTime.cancelItems(this); + } + + /** + * Cancels all tasks that have not yet been executed. Does not interrupt + * any currently executing task. + */ + @Override + public List shutdownNow() { + shutdown = true; + return currentTime.cancelItems(this).stream().map(item -> ((RunnableItem) item).getAction()) + .collect(Collectors.toList()); + } + + @Override + public boolean isShutdown() { + return shutdown; + } + + @Override + public boolean isTerminated() { + return shutdown; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return shutdown; + } + + @Override + public Future submit(Callable task) { + return enqueueRunOnce(0, new FutureTask<>(task)); + } + + @Override + public Future submit(Runnable task, T result) { + return enqueueRunOnce(0, new FutureTask<>(task, result)); + } + + @Override + public Future submit(Runnable task) { + return enqueueRunOnce(0, new FutureTask<>(task, null)); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public void execute(Runnable command) { + currentTime.enqueue(new RunnableItem(currentTime, this, 0, command)); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return enqueueRunOnce(unit.toMillis(delay), new PseudoScheduledFuture<>(command, null, false)); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return enqueueRunOnce(unit.toMillis(delay), new PseudoScheduledFuture<>(callable, false)); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return enqueuePeriodic(unit.toMillis(initialDelay), unit.toMillis(period), + new PseudoScheduledFuture<>(command, null, true)); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return enqueuePeriodic(unit.toMillis(initialDelay), unit.toMillis(delay), + new PseudoScheduledFuture<>(command, null, true)); + } + + /** + * Enqueues a future to be executed one time. + * + * @param delay delay until the future should be executed + * @param future future to be enqueued + * @return the future + */ + private , T> F enqueueRunOnce(long delay, F future) { + currentTime.enqueue(new RunnableItem(currentTime, this, delay, future)); + return future; + } + + /** + * Enqueues a future to be executed periodically. + * + * @param initialDelayMs delay until the future should be executed the first time + * @param periodMs delay between executions of the future + * @param future future to be enqueued + * @return the future + */ + private ScheduledFuture enqueuePeriodic(long initialDelayMs, long periodMs, + PseudoScheduledFuture future) { + currentTime.enqueue(new PeriodicItem(currentTime, this, initialDelayMs, periodMs, future)); + return future; + } +} diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java new file mode 100644 index 00000000..6ce7bc04 --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.TimeUnit; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; + +/** + * Scheduled future that gets its time from an associated work item. + * + * @param type of result returned by the future + */ +class PseudoScheduledFuture extends FutureTask implements RunnableScheduledFuture { + + /** + * {@code True} if this task is periodic, {@code false} otherwise. + */ + private final boolean periodic; + + /** + * The work item with which this is associated. + */ + @Getter(AccessLevel.PROTECTED) + @Setter(AccessLevel.PROTECTED) + private WorkItem workItem; + + /** + * Constructs the object. + * + * @param runnable action to be executed + * @param result value to be returned by the {@link #get()} operation + * @param periodic {@code true} if this task is periodic, {@code false} otherwise + */ + public PseudoScheduledFuture(Runnable runnable, T result, boolean periodic) { + super(runnable, result); + this.periodic = periodic; + } + + /** + * Constructs the object. + * + * @param callable action to be executed + * @param periodic {@code true} if this task is periodic, {@code false} otherwise + */ + public PseudoScheduledFuture(Callable callable, boolean periodic) { + super(callable); + this.periodic = periodic; + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(workItem.getDelay(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed other) { + return Long.compare(workItem.getDelay(), other.getDelay(TimeUnit.MILLISECONDS)); + } + + @Override + public boolean isPeriodic() { + return periodic; + } + + @Override + public void run() { + if (isPeriodic()) { + super.runAndReset(); + + } else { + super.run(); + } + } +} diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java new file mode 100644 index 00000000..e8de89a4 --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java @@ -0,0 +1,100 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import java.util.Date; +import java.util.Timer; +import java.util.TimerTask; + +/** + * A timer that uses {@link TestTimeMulti} to execute its tasks. + * + *

Note: this only supports the run() method of {@link TimerTask}; the other methods, + * including cancel() are not supported. However, tasks may be canceled via + * {@link Timer#cancel()}. + * + *

Currently, this does not support any of the scheduling methods that take dates, + * though that could be added relatively easily. + */ +public class PseudoTimer extends Timer { + private static final String NOT_IMPLEMENTED_YET = "not implemented yet"; + + /** + * Time with which this item is associated. + */ + private final TestTimeMulti currentTime; + + + /** + * Constructs the object. + * + * @param currentTime object to be used to execute timer tasks + */ + public PseudoTimer(TestTimeMulti currentTime) { + // create as a daemon so jvm doesn't hang when it attempts to exit + super(true); + + this.currentTime = currentTime; + + // don't need the timer's thread + super.cancel(); + } + + @Override + public void schedule(TimerTask task, long delayMs) { + currentTime.enqueue(new RunnableItem(currentTime, this, delayMs, task)); + } + + @Override + public void schedule(TimerTask task, Date time) { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public void schedule(TimerTask task, long delayMs, long periodMs) { + currentTime.enqueue(new PeriodicItem(currentTime, this, delayMs, periodMs, task)); + } + + @Override + public void schedule(TimerTask task, Date firstTime, long period) { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public void scheduleAtFixedRate(TimerTask task, long delayMs, long periodMs) { + currentTime.enqueue(new PeriodicItem(currentTime, this, delayMs, periodMs, task)); + } + + @Override + public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) { + throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET); + } + + @Override + public void cancel() { + currentTime.cancelItems(this); + } + + @Override + public int purge() { + return 0; + } +} diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java new file mode 100644 index 00000000..54560316 --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java @@ -0,0 +1,91 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import java.util.concurrent.Future; +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.common.utils.time.TestTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Work item that may be run/executed. + */ +class RunnableItem extends WorkItem { + private static final Logger logger = LoggerFactory.getLogger(RunnableItem.class); + + /** + * Object with which this item is associated. + */ + @Getter(AccessLevel.PROTECTED) + private final Object associate; + + /** + * Action to execute. + */ + @Getter(AccessLevel.PROTECTED) + private final Runnable action; + + + /** + * Constructs the object. + * + * @param currentTime time with which this item is associated + * @param associate object with which this item is associated (e.g., Timer) + * @param delayMs time, in milliseconds, before this item should be executed + * @param action action to be performed + */ + public RunnableItem(TestTime currentTime, Object associate, long delayMs, Runnable action) { + super(currentTime, delayMs); + this.associate = associate; + this.action = action; + + // ensure the task can properly compute its delay + if (action instanceof PseudoScheduledFuture) { + ((PseudoScheduledFuture) action).setWorkItem(this); + } + } + + @Override + public boolean isAssociatedWith(Object associate) { + return (this.associate == associate); + } + + @Override + public boolean wasCancelled() { + return (action instanceof Future && ((Future) action).isCancelled()); + } + + @Override + public void fire() { + try { + action.run(); + } catch (RuntimeException e) { + logger.warn("work item {} threw an exception {}", this, e); + } + } + + @Override + public String toString() { + return "RunnableItem [nextMs=" + getNextMs() + ", associate=" + associate + "]"; + } +} diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java new file mode 100644 index 00000000..1d318803 --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import java.util.concurrent.CountDownLatch; + +/** + * Work item used when a thread invokes sleep(). The thread's "sleep()" method will + * enqueue this item and then invoke {@link #await()} to wait for the test/controlling + * thread to fire it, indicating that the end of the sleep time has been reached. + */ +public class SleepItem extends WorkItem { + /** + * Thread that invoked "sleep()". + */ + private final Thread thread; + + /** + * This will be decremented when this work item is fired, thus releasing the + * "sleeping" thread to continue its work. + */ + private final CountDownLatch latch = new CountDownLatch(1); + + + /** + * Constructs the object. + * + * @param currentTime time with which this item is associated + * @param sleepMs time for which the thread should sleep + * @param thread thread that invoked "sleep()" + */ + public SleepItem(TestTime currentTime, long sleepMs, Thread thread) { + super(currentTime, sleepMs); + this.thread = thread; + } + + @Override + public void interrupt() { + thread.interrupt(); + } + + @Override + public void fire() { + latch.countDown(); + } + + /** + * Waits for the sleep time to be reached. + * + * @throws InterruptedException if the current thread is interrupted + */ + public void await() throws InterruptedException { + latch.await(); + } + + @Override + public String toString() { + return "SleepItem [nextMs=" + getNextMs() + ", latch=" + latch + ", thread=" + thread + "]"; + } +} diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java index 414c18bb..420021f3 100644 --- a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * Common Utils-Test * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -37,7 +37,7 @@ public class TestTime extends CurrentTime { /** * Constructor. - * + * */ public TestTime() { super(); @@ -55,6 +55,8 @@ public class TestTime extends CurrentTime { @Override public void sleep(long sleepMs) throws InterruptedException { - tcur.addAndGet(sleepMs); + if (sleepMs > 0) { + tcur.addAndGet(sleepMs); + } } } diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java index b37e49e0..f52105ed 100644 --- a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java @@ -1,6 +1,6 @@ -/* +/*- * ============LICENSE_START======================================================= - * Common Utils-Test + * ONAP * ================================================================================ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ @@ -20,184 +20,326 @@ package org.onap.policy.common.utils.time; -import java.util.Date; +import static org.junit.Assert.fail; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.PriorityQueue; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import org.onap.policy.common.utils.time.TestTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * "Current" time, when running junit tests in multiple threads. This is intended to be * injected into classes under test, to replace their {@link CurrentTime} objects. The - * {@link #sleep(long)} method blocks until all threads enter and then it moves the notion - * of "current" time forward, allowing threads to resume, as the end of their sleep time - * is reached. Additional threads do not resume until all threads have once again entered - * {@link #sleep(long)} or when {@link #threadCompleted()} is invoked to indicate that a - * thread will not re-enter {@link #sleep(long)}. + * {@link #sleep(long)} method blocks until the "time" has reached the specified sleep + * time. A queue of work items is maintained, sorted by the time for which the items are + * scheduled to execute. Tasks are executed by the test/controlling thread when one of the + * waitXxx() methods is invoked. {@link PseudoTimer} and + * {@link PseudoScheduledExecutorService} add work items to the queue. + * + *

+ * This only handles relatively simple situations, though it does support multi-threaded + * testing. */ -public class TestTimeMulti extends CurrentTime { +public class TestTimeMulti extends TestTime { + private static final Logger logger = LoggerFactory.getLogger(TestTimeMulti.class); + + public static final String NEVER_SATISFIED = "condition was never satisfied"; + + /** + * Default value, in milliseconds, to wait for an item to be added to the queue. + */ + public static final long DEFAULT_MAX_WAIT_MS = 5000L; /** - * Number of threads that will be sleeping simultaneously. + * Maximum time that the test thread should wait for something to be added to its work + * queue. */ - private int nthreads; + @Getter + private final long maxWaitMs; /** - * "Current" time, in milliseconds, used by tests. + * Queue of timer tasks to be executed, sorted by {@link WorkItem#nextMs}. */ - private long tcur = System.currentTimeMillis(); + private final PriorityQueue queue = + new PriorityQueue<>((item1, item2) -> Long.compare(item1.getNextMs(), item2.getNextMs())); /** - * Queue of sleeping threads waiting to be awakened. + * Lock used when modifying the queue. */ - private final PriorityQueue queue = new PriorityQueue<>(); + private final Object updateLock = new Object(); /** - * Used to synchronize updates. + * Constructs the object using the default maximum wait time. */ - private final Object locker = new Object(); + public TestTimeMulti() { + this(DEFAULT_MAX_WAIT_MS); + } /** - * Constructor. + * Constructs the object. * - * @param nthreads number of threads that will be sleeping simultaneously + * @param maxWaitMs maximum time that the test thread should wait for something to be + * added to its work queue */ - public TestTimeMulti(int nthreads) { - this.nthreads = nthreads; + public TestTimeMulti(long maxWaitMs) { + this.maxWaitMs = maxWaitMs; } - @Override - public long getMillis() { - return tcur; + /** + * Determines if the task queue is empty. + * + * @return {@code true} if the task queue is empty, {@code false} otherwise + */ + public boolean isEmpty() { + synchronized (updateLock) { + purgeItems(); + return queue.isEmpty(); + } } - @Override - public Date getDate() { - return new Date(tcur); + /** + * Gets the number of tasks in the work queue. + * + * @return the number of tasks in the work queue + */ + public int queueLength() { + synchronized (updateLock) { + purgeItems(); + return queue.size(); + } } - @Override - public void sleep(long sleepMs) throws InterruptedException { - if (sleepMs <= 0) { - return; + /** + * Indicates that this will no longer be used. Interrupts any threads that are waiting + * for their "sleep()" to complete. + */ + public void destroy() { + synchronized (updateLock) { + queue.forEach(WorkItem::interrupt); + queue.clear(); } + } - Info info = new Info(tcur + sleepMs); + /** + * Runs a single task from the queue. + * + * @param waitMs time, in milliseconds, for which to wait. This is "real" time rather + * than pseudo time + * + * @return {@code true} if a task was run, {@code false} if the queue was empty + * @throws InterruptedException if the current thread is interrupted + */ + public boolean runOneTask(long waitMs) throws InterruptedException { + WorkItem item = pollQueue(waitMs); + if (item == null) { + return false; + } - synchronized (locker) { - queue.add(info); + runItem(item); + return true; + } - if (queue.size() == nthreads) { - // all threads are now sleeping - wake one up - wakeThreads(); + /** + * Waits for the pseudo time to reach a certain point. Executes work items until the + * time is reached. + * + * @param waitMs pseudo time, in milliseconds, for which to wait + * @throws InterruptedException if the current thread is interrupted + */ + public void waitFor(long waitMs) throws InterruptedException { + // pseudo time for which we're waiting + long tend = getMillis() + waitMs; + + while (getMillis() < tend) { + if (!runOneTask(maxWaitMs)) { + /* + * Waited the maximum poll time and nothing has happened, so we'll just + * bump the time directly. + */ + super.sleep(tend - getMillis()); + break; } } - - // this MUST happen outside of the "synchronized" block - info.await(); } /** - * Indicates that a thread has terminated or that it will no longer be invoking - * {@link #sleep(long)}. Awakens the next sleeping thread, if the queue is full after - * removing the terminated thread. + * Waits for a condition to become true. Executes work items until the given condition + * is true. * - * @throws IllegalStateException if the queue is already full + * @param condition condition to be checked */ - public void threadCompleted() { - synchronized (locker) { - int sz = queue.size(); - if (sz >= nthreads) { - throw new IllegalStateException("too many threads still sleeping"); - } + public void waitUntil(Callable condition) { + try { + // real time for which we're waiting + long realEnd = System.currentTimeMillis() + maxWaitMs; - --nthreads; + while (System.currentTimeMillis() < realEnd) { + if (condition.call()) { + return; + } - if (sz == nthreads) { - // after removing terminated thread - queue is now full; awaken something - wakeThreads(); + runOneTask(100); } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("interrupted while waiting for condition", e); + fail("interrupted while waiting for condition: " + e.getMessage()); + + } catch (Exception e) { + logger.error("condition evaluator threw an exception", e); + fail("condition evaluator threw an exception: " + e.getMessage()); } + + fail(NEVER_SATISFIED); } /** - * Advances the "current" time and awakens any threads sleeping until that time. + * Waits for a condition to become true. Executes work items until the given condition + * is true or the maximum wait time is reached. + * + * @param twait maximum, pseudo time to wait + * @param units time units represented by "twait" + * @param condition condition to be checked */ - private void wakeThreads() { - Info info = queue.poll(); - if (info == null) { - return; - } + public void waitUntil(long twait, TimeUnit units, Callable condition) { + // pseudo time for which we're waiting + long tend = getMillis() + units.toMillis(twait); - tcur = info.getAwakenAtMs(); - info.wake(); + waitUntil(() -> { + if (getMillis() >= tend) { + fail(NEVER_SATISFIED); + } - while ((info = queue.poll()) != null) { - if (tcur == info.getAwakenAtMs()) { - info.wake(); + return condition.call(); + }); + } - } else { - // not ready to wake this thread - put it back in the queue - queue.add(info); - break; + /** + * Gets one item from the work queue. + * + * @param waitMs time, in milliseconds, for which to wait. This is "real" time rather + * than pseudo time + * @return the first item in the queue, or {@code null} if no item was added to the + * queue before the wait time expired + * @throws InterruptedException if the current thread was interrupted + */ + private WorkItem pollQueue(long waitMs) throws InterruptedException { + long realEnd = System.currentTimeMillis() + waitMs; + WorkItem work; + + synchronized (updateLock) { + while ((work = queue.poll()) == null) { + updateLock.wait(Math.max(1, realEnd - System.currentTimeMillis())); + + if (queue.isEmpty() && System.currentTimeMillis() >= realEnd) { + return null; + } } } + + return work; } /** - * Info about a sleeping thread. + * Runs a work item. + * + * @param work work item to be run + * @throws InterruptedException if the current thread was interrupted */ - private static class Info implements Comparable { - - /** - * Time, in milliseconds, at which the associated thread should awaken. - */ - private final long awakenAtMs; + private void runItem(WorkItem work) throws InterruptedException { + if (work.wasCancelled()) { + logger.info("work item was canceled {}", work); + return; + } - /** - * This is triggered when the associated thread should awaken. - */ - private final CountDownLatch latch = new CountDownLatch(1); + // update the pseudo time + super.sleep(work.getNextMs() - getMillis()); - /** - * Constructor. - * - * @param awakenAtMs time, in milliseconds, at which the associated thread should - * awaken + /* + * Add it back into the queue if appropriate, in case cancel() is called while + * it's executing. */ - public Info(long awakenAtMs) { - this.awakenAtMs = awakenAtMs; + if (work.bumpNextTime()) { + logger.info("re-enqueuing work item"); + enqueue(work); } - public long getAwakenAtMs() { - return awakenAtMs; + logger.info("fire work item {}", work); + work.fire(); + } + + @Override + public void sleep(long sleepMs) throws InterruptedException { + if (sleepMs <= 0) { + return; } - /** - * Awakens the associated thread by decrementing its latch. - */ - public void wake() { - latch.countDown(); + SleepItem item = new SleepItem(this, sleepMs, Thread.currentThread()); + enqueue(item); + + // wait for the item to fire + logger.info("sleeping {}", item); + item.await(); + logger.info("done sleeping {}", Thread.currentThread()); + } + + /** + * Adds an item to the {@link #queue}. + * + * @param item item to be added + */ + protected void enqueue(WorkItem item) { + logger.info("enqueue work item {}", item); + synchronized (updateLock) { + queue.add(item); + updateLock.notify(); } + } - /** - * Blocks the current thread until awakened (i.e., until its latch is - * decremented). - * - * @throws InterruptedException can be interrupted - */ - public void await() throws InterruptedException { - latch.await(); + /** + * Cancels work items by removing them from the queue if they're associated with the + * specified object. + * + * @param associate object whose associated items are to be cancelled + * @return list of items that were canceled + */ + protected List cancelItems(Object associate) { + List items = new LinkedList<>(); + + synchronized (updateLock) { + Iterator iter = queue.iterator(); + while (iter.hasNext()) { + WorkItem item = iter.next(); + if (item.isAssociatedWith(associate)) { + iter.remove(); + items.add(item); + } + } } - @Override - public int compareTo(Info object) { - int diff = Long.compare(awakenAtMs, object.awakenAtMs); + return items; + } - // this assumes that Object.toString() is unique for each Info object - if (diff == 0) { - diff = this.toString().compareTo(object.toString()); + /** + * Purges work items that are known to have been canceled. (Does not remove canceled + * TimerTasks, as there is no way via the public API to determine if the task has been + * canceled.) + */ + public void purgeItems() { + synchronized (updateLock) { + Iterator iter = queue.iterator(); + while (iter.hasNext()) { + if (iter.next().wasCancelled()) { + iter.remove(); + } } - return diff; } - } } diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java new file mode 100644 index 00000000..af3d5d7e --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java @@ -0,0 +1,128 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import lombok.AccessLevel; +import lombok.Getter; +import org.onap.policy.common.utils.time.TestTime; + +/** + * Work item to be executed at some time. + */ +class WorkItem { + + /** + * Pseudo time with which this item is associated. + */ + private final TestTime currentTime; + + /** + * Time, in milliseconds, when the timer should fire next. + */ + @Getter(AccessLevel.PROTECTED) + private long nextMs; + + + /** + * Constructs the object. + * + * @param currentTime time with which this item is associated + * @param delayMs time, in milliseconds, before this item should be executed + */ + public WorkItem(TestTime currentTime, long delayMs) { + if (delayMs < 0) { + throw new IllegalArgumentException("invalid delay " + delayMs); + } + + this.currentTime = currentTime; + bumpNextTime(delayMs); + } + + /** + * Gets the delay until the item should be fired. + * + * @return the delay until the item should be fired + */ + public long getDelay() { + return (nextMs - currentTime.getMillis()); + } + + /** + * Determines if this work item was canceled. + * + * @return {@code true} if this item was canceled, {@code false} otherwise + */ + public boolean wasCancelled() { + return false; + } + + /** + * Bumps {@link #nextMs}, if this is a periodic task. The default method simply + * returns {@code false}. + * + * @return {@code true} if the time was bumped, {@code false} otherwise (i.e., it is + * not a periodic task) + */ + public boolean bumpNextTime() { + return false; + } + + /** + * Bumps {@link #nextMs}, setting it to the current time plus the given delay. + * + * @param delayMs time, in milliseconds, before this item should be (re-)executed + */ + protected void bumpNextTime(long delayMs) { + if (delayMs < 0) { + throw new IllegalArgumentException("negative delay"); + } + + // always bump by at least 1 millisecond + this.nextMs = currentTime.getMillis() + Math.max(1, delayMs); + } + + /** + * Interrupts the thread that created the work item, if appropriate. The default + * method does nothing. + */ + public void interrupt() { + // do nothing + } + + /** + * Determines if this item is associated with the given object. The default method + * simply returns {@code false}. + * + * @param associate candidate associate (e.g., Timer) + * @return {@code true} if the item is associated with the given object, {@code false} + * otherwise + */ + public boolean isAssociatedWith(Object associate) { + return false; + } + + /** + * Fires/executes this item. The default method does nothing. + */ + public void fire() { + // do nothing + } +} -- cgit 1.2.3-korg