aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--utils-test/pom.xml11
-rw-r--r--utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java65
-rw-r--r--utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java186
-rw-r--r--utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java98
-rw-r--r--utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java100
-rw-r--r--utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java91
-rw-r--r--utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java78
-rw-r--r--utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java12
-rw-r--r--utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java364
-rw-r--r--utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java128
-rw-r--r--utils-test/src/test/java/org/onap/policy/common/utils/time/PeriodicItemTest.java78
-rw-r--r--utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorServiceTest.java267
-rw-r--r--utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledFutureTest.java145
-rw-r--r--utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoTimerTest.java141
-rw-r--r--utils-test/src/test/java/org/onap/policy/common/utils/time/RunnableItemTest.java102
-rw-r--r--utils-test/src/test/java/org/onap/policy/common/utils/time/SleepItemTest.java112
-rw-r--r--utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java501
-rw-r--r--utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java11
-rw-r--r--utils-test/src/test/java/org/onap/policy/common/utils/time/WorkItemTest.java100
19 files changed, 2405 insertions, 185 deletions
diff --git a/utils-test/pom.xml b/utils-test/pom.xml
index 6ed49255..3744467f 100644
--- a/utils-test/pom.xml
+++ b/utils-test/pom.xml
@@ -37,6 +37,11 @@
<dependencies>
<dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>provided</scope>
@@ -65,6 +70,12 @@
<artifactId>powermock-api-mockito</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.openpojo</groupId>
<artifactId>openpojo</artifactId>
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java
new file mode 100644
index 00000000..79d2f226
--- /dev/null
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/PeriodicItem.java
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import org.onap.policy.common.utils.time.TestTime;
+
+/**
+ * Work item that runs periodically.
+ */
+class PeriodicItem extends RunnableItem {
+
+ /**
+ * Time, in milliseconds, to wait between executions.
+ */
+ private final long periodMs;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param currentTime time with which this item is associated
+ * @param associate object with which this item is associated (e.g., Timer)
+ * @param delayMs time, in milliseconds, before this item should be executed
+ * @param periodMs time, in milliseconds, to delay between each execution
+ * @param action action to be performed
+ */
+ public PeriodicItem(TestTime currentTime, Object associate, long delayMs, long periodMs, Runnable action) {
+ super(currentTime, associate, delayMs, action);
+
+ if (periodMs <= 0) {
+ throw new IllegalArgumentException("invalid period " + periodMs);
+ }
+
+ this.periodMs = periodMs;
+ }
+
+ @Override
+ public boolean bumpNextTime() {
+ bumpNextTime(periodMs);
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "PeriodicItem [nextMs=" + getNextMs() + ", periodMs=" + periodMs + ", associate=" + getAssociate() + "]";
+ }
+}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java
new file mode 100644
index 00000000..4f9b32c9
--- /dev/null
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorService.java
@@ -0,0 +1,186 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * Scheduled executor service that uses {@link TestTimeMulti} to execute its tasks. Note:
+ * the invokeXxx() methods are not currently supported.
+ */
+public class PseudoScheduledExecutorService implements ScheduledExecutorService {
+ private static final String NOT_IMPLEMENTED_YET = "not implemented yet";
+
+ /**
+ * Object to be used to execute timer tasks.
+ */
+ private final TestTimeMulti currentTime;
+
+ /**
+ * {@code True} if {@link #shutdown()} or {@link #shutdownNow()} has been called,
+ * {@code false} otherwise.
+ */
+ private boolean shutdown = false;
+
+ /**
+ * Constructs the object.
+ *
+ * @param currentTime object to be used to execute timer tasks
+ */
+ public PseudoScheduledExecutorService(TestTimeMulti currentTime) {
+ this.currentTime = currentTime;
+ }
+
+ /**
+ * Cancels <i>all</i> tasks that have not yet been executed.
+ */
+ @Override
+ public void shutdown() {
+ shutdown = true;
+ currentTime.cancelItems(this);
+ }
+
+ /**
+ * Cancels <i>all</i> tasks that have not yet been executed. Does <i>not</i> interrupt
+ * any currently executing task.
+ */
+ @Override
+ public List<Runnable> shutdownNow() {
+ shutdown = true;
+ return currentTime.cancelItems(this).stream().map(item -> ((RunnableItem) item).getAction())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return shutdown;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return shutdown;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return shutdown;
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return enqueueRunOnce(0, new FutureTask<>(task));
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return enqueueRunOnce(0, new FutureTask<>(task, result));
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return enqueueRunOnce(0, new FutureTask<>(task, null));
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ currentTime.enqueue(new RunnableItem(currentTime, this, 0, command));
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return enqueueRunOnce(unit.toMillis(delay), new PseudoScheduledFuture<>(command, null, false));
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ return enqueueRunOnce(unit.toMillis(delay), new PseudoScheduledFuture<>(callable, false));
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return enqueuePeriodic(unit.toMillis(initialDelay), unit.toMillis(period),
+ new PseudoScheduledFuture<>(command, null, true));
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return enqueuePeriodic(unit.toMillis(initialDelay), unit.toMillis(delay),
+ new PseudoScheduledFuture<>(command, null, true));
+ }
+
+ /**
+ * Enqueues a future to be executed one time.
+ *
+ * @param delay delay until the future should be executed
+ * @param future future to be enqueued
+ * @return the future
+ */
+ private <F extends FutureTask<T>, T> F enqueueRunOnce(long delay, F future) {
+ currentTime.enqueue(new RunnableItem(currentTime, this, delay, future));
+ return future;
+ }
+
+ /**
+ * Enqueues a future to be executed periodically.
+ *
+ * @param initialDelayMs delay until the future should be executed the first time
+ * @param periodMs delay between executions of the future
+ * @param future future to be enqueued
+ * @return the future
+ */
+ private <T> ScheduledFuture<T> enqueuePeriodic(long initialDelayMs, long periodMs,
+ PseudoScheduledFuture<T> future) {
+ currentTime.enqueue(new PeriodicItem(currentTime, this, initialDelayMs, periodMs, future));
+ return future;
+ }
+}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java
new file mode 100644
index 00000000..6ce7bc04
--- /dev/null
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoScheduledFuture.java
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Scheduled future that gets its time from an associated work item.
+ *
+ * @param <T> type of result returned by the future
+ */
+class PseudoScheduledFuture<T> extends FutureTask<T> implements RunnableScheduledFuture<T> {
+
+ /**
+ * {@code True} if this task is periodic, {@code false} otherwise.
+ */
+ private final boolean periodic;
+
+ /**
+ * The work item with which this is associated.
+ */
+ @Getter(AccessLevel.PROTECTED)
+ @Setter(AccessLevel.PROTECTED)
+ private WorkItem workItem;
+
+ /**
+ * Constructs the object.
+ *
+ * @param runnable action to be executed
+ * @param result value to be returned by the {@link #get()} operation
+ * @param periodic {@code true} if this task is periodic, {@code false} otherwise
+ */
+ public PseudoScheduledFuture(Runnable runnable, T result, boolean periodic) {
+ super(runnable, result);
+ this.periodic = periodic;
+ }
+
+ /**
+ * Constructs the object.
+ *
+ * @param callable action to be executed
+ * @param periodic {@code true} if this task is periodic, {@code false} otherwise
+ */
+ public PseudoScheduledFuture(Callable<T> callable, boolean periodic) {
+ super(callable);
+ this.periodic = periodic;
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(workItem.getDelay(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int compareTo(Delayed other) {
+ return Long.compare(workItem.getDelay(), other.getDelay(TimeUnit.MILLISECONDS));
+ }
+
+ @Override
+ public boolean isPeriodic() {
+ return periodic;
+ }
+
+ @Override
+ public void run() {
+ if (isPeriodic()) {
+ super.runAndReset();
+
+ } else {
+ super.run();
+ }
+ }
+}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java
new file mode 100644
index 00000000..e8de89a4
--- /dev/null
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoTimer.java
@@ -0,0 +1,100 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * A timer that uses {@link TestTimeMulti} to execute its tasks.
+ *
+ * <p/>Note: this only supports the run() method of {@link TimerTask}; the other methods,
+ * including cancel() are not supported. However, tasks may be canceled via
+ * {@link Timer#cancel()}.
+ *
+ * <p/>Currently, this does not support any of the scheduling methods that take dates,
+ * though that could be added relatively easily.
+ */
+public class PseudoTimer extends Timer {
+ private static final String NOT_IMPLEMENTED_YET = "not implemented yet";
+
+ /**
+ * Time with which this item is associated.
+ */
+ private final TestTimeMulti currentTime;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param currentTime object to be used to execute timer tasks
+ */
+ public PseudoTimer(TestTimeMulti currentTime) {
+ // create as a daemon so jvm doesn't hang when it attempts to exit
+ super(true);
+
+ this.currentTime = currentTime;
+
+ // don't need the timer's thread
+ super.cancel();
+ }
+
+ @Override
+ public void schedule(TimerTask task, long delayMs) {
+ currentTime.enqueue(new RunnableItem(currentTime, this, delayMs, task));
+ }
+
+ @Override
+ public void schedule(TimerTask task, Date time) {
+ throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+ }
+
+ @Override
+ public void schedule(TimerTask task, long delayMs, long periodMs) {
+ currentTime.enqueue(new PeriodicItem(currentTime, this, delayMs, periodMs, task));
+ }
+
+ @Override
+ public void schedule(TimerTask task, Date firstTime, long period) {
+ throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+ }
+
+ @Override
+ public void scheduleAtFixedRate(TimerTask task, long delayMs, long periodMs) {
+ currentTime.enqueue(new PeriodicItem(currentTime, this, delayMs, periodMs, task));
+ }
+
+ @Override
+ public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) {
+ throw new UnsupportedOperationException(NOT_IMPLEMENTED_YET);
+ }
+
+ @Override
+ public void cancel() {
+ currentTime.cancelItems(this);
+ }
+
+ @Override
+ public int purge() {
+ return 0;
+ }
+}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java
new file mode 100644
index 00000000..54560316
--- /dev/null
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/RunnableItem.java
@@ -0,0 +1,91 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.concurrent.Future;
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.onap.policy.common.utils.time.TestTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Work item that may be run/executed.
+ */
+class RunnableItem extends WorkItem {
+ private static final Logger logger = LoggerFactory.getLogger(RunnableItem.class);
+
+ /**
+ * Object with which this item is associated.
+ */
+ @Getter(AccessLevel.PROTECTED)
+ private final Object associate;
+
+ /**
+ * Action to execute.
+ */
+ @Getter(AccessLevel.PROTECTED)
+ private final Runnable action;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param currentTime time with which this item is associated
+ * @param associate object with which this item is associated (e.g., Timer)
+ * @param delayMs time, in milliseconds, before this item should be executed
+ * @param action action to be performed
+ */
+ public RunnableItem(TestTime currentTime, Object associate, long delayMs, Runnable action) {
+ super(currentTime, delayMs);
+ this.associate = associate;
+ this.action = action;
+
+ // ensure the task can properly compute its delay
+ if (action instanceof PseudoScheduledFuture) {
+ ((PseudoScheduledFuture<?>) action).setWorkItem(this);
+ }
+ }
+
+ @Override
+ public boolean isAssociatedWith(Object associate) {
+ return (this.associate == associate);
+ }
+
+ @Override
+ public boolean wasCancelled() {
+ return (action instanceof Future && ((Future<?>) action).isCancelled());
+ }
+
+ @Override
+ public void fire() {
+ try {
+ action.run();
+ } catch (RuntimeException e) {
+ logger.warn("work item {} threw an exception {}", this, e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "RunnableItem [nextMs=" + getNextMs() + ", associate=" + associate + "]";
+ }
+}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java
new file mode 100644
index 00000000..1d318803
--- /dev/null
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/SleepItem.java
@@ -0,0 +1,78 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Work item used when a thread invokes sleep(). The thread's "sleep()" method will
+ * enqueue this item and then invoke {@link #await()} to wait for the test/controlling
+ * thread to fire it, indicating that the end of the sleep time has been reached.
+ */
+public class SleepItem extends WorkItem {
+ /**
+ * Thread that invoked "sleep()".
+ */
+ private final Thread thread;
+
+ /**
+ * This will be decremented when this work item is fired, thus releasing the
+ * "sleeping" thread to continue its work.
+ */
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param currentTime time with which this item is associated
+ * @param sleepMs time for which the thread should sleep
+ * @param thread thread that invoked "sleep()"
+ */
+ public SleepItem(TestTime currentTime, long sleepMs, Thread thread) {
+ super(currentTime, sleepMs);
+ this.thread = thread;
+ }
+
+ @Override
+ public void interrupt() {
+ thread.interrupt();
+ }
+
+ @Override
+ public void fire() {
+ latch.countDown();
+ }
+
+ /**
+ * Waits for the sleep time to be reached.
+ *
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ public void await() throws InterruptedException {
+ latch.await();
+ }
+
+ @Override
+ public String toString() {
+ return "SleepItem [nextMs=" + getNextMs() + ", latch=" + latch + ", thread=" + thread + "]";
+ }
+}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java
index 414c18bb..420021f3 100644
--- a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* Common Utils-Test
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -37,7 +37,7 @@ public class TestTime extends CurrentTime {
/**
* Constructor.
- *
+ *
*/
public TestTime() {
super();
@@ -55,6 +55,8 @@ public class TestTime extends CurrentTime {
@Override
public void sleep(long sleepMs) throws InterruptedException {
- tcur.addAndGet(sleepMs);
+ if (sleepMs > 0) {
+ tcur.addAndGet(sleepMs);
+ }
}
}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java
index b37e49e0..f52105ed 100644
--- a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java
@@ -1,6 +1,6 @@
-/*
+/*-
* ============LICENSE_START=======================================================
- * Common Utils-Test
+ * ONAP
* ================================================================================
* Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
@@ -20,184 +20,326 @@
package org.onap.policy.common.utils.time;
-import java.util.Date;
+import static org.junit.Assert.fail;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import java.util.PriorityQueue;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.onap.policy.common.utils.time.TestTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* "Current" time, when running junit tests in multiple threads. This is intended to be
* injected into classes under test, to replace their {@link CurrentTime} objects. The
- * {@link #sleep(long)} method blocks until all threads enter and then it moves the notion
- * of "current" time forward, allowing threads to resume, as the end of their sleep time
- * is reached. Additional threads do not resume until all threads have once again entered
- * {@link #sleep(long)} or when {@link #threadCompleted()} is invoked to indicate that a
- * thread will not re-enter {@link #sleep(long)}.
+ * {@link #sleep(long)} method blocks until the "time" has reached the specified sleep
+ * time. A queue of work items is maintained, sorted by the time for which the items are
+ * scheduled to execute. Tasks are executed by the test/controlling thread when one of the
+ * waitXxx() methods is invoked. {@link PseudoTimer} and
+ * {@link PseudoScheduledExecutorService} add work items to the queue.
+ *
+ * <p/>
+ * This only handles relatively simple situations, though it does support multi-threaded
+ * testing.
*/
-public class TestTimeMulti extends CurrentTime {
+public class TestTimeMulti extends TestTime {
+ private static final Logger logger = LoggerFactory.getLogger(TestTimeMulti.class);
+
+ public static final String NEVER_SATISFIED = "condition was never satisfied";
+
+ /**
+ * Default value, in milliseconds, to wait for an item to be added to the queue.
+ */
+ public static final long DEFAULT_MAX_WAIT_MS = 5000L;
/**
- * Number of threads that will be sleeping simultaneously.
+ * Maximum time that the test thread should wait for something to be added to its work
+ * queue.
*/
- private int nthreads;
+ @Getter
+ private final long maxWaitMs;
/**
- * "Current" time, in milliseconds, used by tests.
+ * Queue of timer tasks to be executed, sorted by {@link WorkItem#nextMs}.
*/
- private long tcur = System.currentTimeMillis();
+ private final PriorityQueue<WorkItem> queue =
+ new PriorityQueue<>((item1, item2) -> Long.compare(item1.getNextMs(), item2.getNextMs()));
/**
- * Queue of sleeping threads waiting to be awakened.
+ * Lock used when modifying the queue.
*/
- private final PriorityQueue<Info> queue = new PriorityQueue<>();
+ private final Object updateLock = new Object();
/**
- * Used to synchronize updates.
+ * Constructs the object using the default maximum wait time.
*/
- private final Object locker = new Object();
+ public TestTimeMulti() {
+ this(DEFAULT_MAX_WAIT_MS);
+ }
/**
- * Constructor.
+ * Constructs the object.
*
- * @param nthreads number of threads that will be sleeping simultaneously
+ * @param maxWaitMs maximum time that the test thread should wait for something to be
+ * added to its work queue
*/
- public TestTimeMulti(int nthreads) {
- this.nthreads = nthreads;
+ public TestTimeMulti(long maxWaitMs) {
+ this.maxWaitMs = maxWaitMs;
}
- @Override
- public long getMillis() {
- return tcur;
+ /**
+ * Determines if the task queue is empty.
+ *
+ * @return {@code true} if the task queue is empty, {@code false} otherwise
+ */
+ public boolean isEmpty() {
+ synchronized (updateLock) {
+ purgeItems();
+ return queue.isEmpty();
+ }
}
- @Override
- public Date getDate() {
- return new Date(tcur);
+ /**
+ * Gets the number of tasks in the work queue.
+ *
+ * @return the number of tasks in the work queue
+ */
+ public int queueLength() {
+ synchronized (updateLock) {
+ purgeItems();
+ return queue.size();
+ }
}
- @Override
- public void sleep(long sleepMs) throws InterruptedException {
- if (sleepMs <= 0) {
- return;
+ /**
+ * Indicates that this will no longer be used. Interrupts any threads that are waiting
+ * for their "sleep()" to complete.
+ */
+ public void destroy() {
+ synchronized (updateLock) {
+ queue.forEach(WorkItem::interrupt);
+ queue.clear();
}
+ }
- Info info = new Info(tcur + sleepMs);
+ /**
+ * Runs a single task from the queue.
+ *
+ * @param waitMs time, in milliseconds, for which to wait. This is "real" time rather
+ * than pseudo time
+ *
+ * @return {@code true} if a task was run, {@code false} if the queue was empty
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ public boolean runOneTask(long waitMs) throws InterruptedException {
+ WorkItem item = pollQueue(waitMs);
+ if (item == null) {
+ return false;
+ }
- synchronized (locker) {
- queue.add(info);
+ runItem(item);
+ return true;
+ }
- if (queue.size() == nthreads) {
- // all threads are now sleeping - wake one up
- wakeThreads();
+ /**
+ * Waits for the pseudo time to reach a certain point. Executes work items until the
+ * time is reached.
+ *
+ * @param waitMs pseudo time, in milliseconds, for which to wait
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ public void waitFor(long waitMs) throws InterruptedException {
+ // pseudo time for which we're waiting
+ long tend = getMillis() + waitMs;
+
+ while (getMillis() < tend) {
+ if (!runOneTask(maxWaitMs)) {
+ /*
+ * Waited the maximum poll time and nothing has happened, so we'll just
+ * bump the time directly.
+ */
+ super.sleep(tend - getMillis());
+ break;
}
}
-
- // this MUST happen outside of the "synchronized" block
- info.await();
}
/**
- * Indicates that a thread has terminated or that it will no longer be invoking
- * {@link #sleep(long)}. Awakens the next sleeping thread, if the queue is full after
- * removing the terminated thread.
+ * Waits for a condition to become true. Executes work items until the given condition
+ * is true.
*
- * @throws IllegalStateException if the queue is already full
+ * @param condition condition to be checked
*/
- public void threadCompleted() {
- synchronized (locker) {
- int sz = queue.size();
- if (sz >= nthreads) {
- throw new IllegalStateException("too many threads still sleeping");
- }
+ public void waitUntil(Callable<Boolean> condition) {
+ try {
+ // real time for which we're waiting
+ long realEnd = System.currentTimeMillis() + maxWaitMs;
- --nthreads;
+ while (System.currentTimeMillis() < realEnd) {
+ if (condition.call()) {
+ return;
+ }
- if (sz == nthreads) {
- // after removing terminated thread - queue is now full; awaken something
- wakeThreads();
+ runOneTask(100);
}
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("interrupted while waiting for condition", e);
+ fail("interrupted while waiting for condition: " + e.getMessage());
+
+ } catch (Exception e) {
+ logger.error("condition evaluator threw an exception", e);
+ fail("condition evaluator threw an exception: " + e.getMessage());
}
+
+ fail(NEVER_SATISFIED);
}
/**
- * Advances the "current" time and awakens any threads sleeping until that time.
+ * Waits for a condition to become true. Executes work items until the given condition
+ * is true or the maximum wait time is reached.
+ *
+ * @param twait maximum, pseudo time to wait
+ * @param units time units represented by "twait"
+ * @param condition condition to be checked
*/
- private void wakeThreads() {
- Info info = queue.poll();
- if (info == null) {
- return;
- }
+ public void waitUntil(long twait, TimeUnit units, Callable<Boolean> condition) {
+ // pseudo time for which we're waiting
+ long tend = getMillis() + units.toMillis(twait);
- tcur = info.getAwakenAtMs();
- info.wake();
+ waitUntil(() -> {
+ if (getMillis() >= tend) {
+ fail(NEVER_SATISFIED);
+ }
- while ((info = queue.poll()) != null) {
- if (tcur == info.getAwakenAtMs()) {
- info.wake();
+ return condition.call();
+ });
+ }
- } else {
- // not ready to wake this thread - put it back in the queue
- queue.add(info);
- break;
+ /**
+ * Gets one item from the work queue.
+ *
+ * @param waitMs time, in milliseconds, for which to wait. This is "real" time rather
+ * than pseudo time
+ * @return the first item in the queue, or {@code null} if no item was added to the
+ * queue before the wait time expired
+ * @throws InterruptedException if the current thread was interrupted
+ */
+ private WorkItem pollQueue(long waitMs) throws InterruptedException {
+ long realEnd = System.currentTimeMillis() + waitMs;
+ WorkItem work;
+
+ synchronized (updateLock) {
+ while ((work = queue.poll()) == null) {
+ updateLock.wait(Math.max(1, realEnd - System.currentTimeMillis()));
+
+ if (queue.isEmpty() && System.currentTimeMillis() >= realEnd) {
+ return null;
+ }
}
}
+
+ return work;
}
/**
- * Info about a sleeping thread.
+ * Runs a work item.
+ *
+ * @param work work item to be run
+ * @throws InterruptedException if the current thread was interrupted
*/
- private static class Info implements Comparable<Info> {
-
- /**
- * Time, in milliseconds, at which the associated thread should awaken.
- */
- private final long awakenAtMs;
+ private void runItem(WorkItem work) throws InterruptedException {
+ if (work.wasCancelled()) {
+ logger.info("work item was canceled {}", work);
+ return;
+ }
- /**
- * This is triggered when the associated thread should awaken.
- */
- private final CountDownLatch latch = new CountDownLatch(1);
+ // update the pseudo time
+ super.sleep(work.getNextMs() - getMillis());
- /**
- * Constructor.
- *
- * @param awakenAtMs time, in milliseconds, at which the associated thread should
- * awaken
+ /*
+ * Add it back into the queue if appropriate, in case cancel() is called while
+ * it's executing.
*/
- public Info(long awakenAtMs) {
- this.awakenAtMs = awakenAtMs;
+ if (work.bumpNextTime()) {
+ logger.info("re-enqueuing work item");
+ enqueue(work);
}
- public long getAwakenAtMs() {
- return awakenAtMs;
+ logger.info("fire work item {}", work);
+ work.fire();
+ }
+
+ @Override
+ public void sleep(long sleepMs) throws InterruptedException {
+ if (sleepMs <= 0) {
+ return;
}
- /**
- * Awakens the associated thread by decrementing its latch.
- */
- public void wake() {
- latch.countDown();
+ SleepItem item = new SleepItem(this, sleepMs, Thread.currentThread());
+ enqueue(item);
+
+ // wait for the item to fire
+ logger.info("sleeping {}", item);
+ item.await();
+ logger.info("done sleeping {}", Thread.currentThread());
+ }
+
+ /**
+ * Adds an item to the {@link #queue}.
+ *
+ * @param item item to be added
+ */
+ protected void enqueue(WorkItem item) {
+ logger.info("enqueue work item {}", item);
+ synchronized (updateLock) {
+ queue.add(item);
+ updateLock.notify();
}
+ }
- /**
- * Blocks the current thread until awakened (i.e., until its latch is
- * decremented).
- *
- * @throws InterruptedException can be interrupted
- */
- public void await() throws InterruptedException {
- latch.await();
+ /**
+ * Cancels work items by removing them from the queue if they're associated with the
+ * specified object.
+ *
+ * @param associate object whose associated items are to be cancelled
+ * @return list of items that were canceled
+ */
+ protected List<WorkItem> cancelItems(Object associate) {
+ List<WorkItem> items = new LinkedList<>();
+
+ synchronized (updateLock) {
+ Iterator<WorkItem> iter = queue.iterator();
+ while (iter.hasNext()) {
+ WorkItem item = iter.next();
+ if (item.isAssociatedWith(associate)) {
+ iter.remove();
+ items.add(item);
+ }
+ }
}
- @Override
- public int compareTo(Info object) {
- int diff = Long.compare(awakenAtMs, object.awakenAtMs);
+ return items;
+ }
- // this assumes that Object.toString() is unique for each Info object
- if (diff == 0) {
- diff = this.toString().compareTo(object.toString());
+ /**
+ * Purges work items that are known to have been canceled. (Does not remove canceled
+ * TimerTasks, as there is no way via the public API to determine if the task has been
+ * canceled.)
+ */
+ public void purgeItems() {
+ synchronized (updateLock) {
+ Iterator<WorkItem> iter = queue.iterator();
+ while (iter.hasNext()) {
+ if (iter.next().wasCancelled()) {
+ iter.remove();
+ }
}
- return diff;
}
-
}
}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java
new file mode 100644
index 00000000..af3d5d7e
--- /dev/null
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/WorkItem.java
@@ -0,0 +1,128 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.onap.policy.common.utils.time.TestTime;
+
+/**
+ * Work item to be executed at some time.
+ */
+class WorkItem {
+
+ /**
+ * Pseudo time with which this item is associated.
+ */
+ private final TestTime currentTime;
+
+ /**
+ * Time, in milliseconds, when the timer should fire next.
+ */
+ @Getter(AccessLevel.PROTECTED)
+ private long nextMs;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param currentTime time with which this item is associated
+ * @param delayMs time, in milliseconds, before this item should be executed
+ */
+ public WorkItem(TestTime currentTime, long delayMs) {
+ if (delayMs < 0) {
+ throw new IllegalArgumentException("invalid delay " + delayMs);
+ }
+
+ this.currentTime = currentTime;
+ bumpNextTime(delayMs);
+ }
+
+ /**
+ * Gets the delay until the item should be fired.
+ *
+ * @return the delay until the item should be fired
+ */
+ public long getDelay() {
+ return (nextMs - currentTime.getMillis());
+ }
+
+ /**
+ * Determines if this work item was canceled.
+ *
+ * @return {@code true} if this item was canceled, {@code false} otherwise
+ */
+ public boolean wasCancelled() {
+ return false;
+ }
+
+ /**
+ * Bumps {@link #nextMs}, if this is a periodic task. The default method simply
+ * returns {@code false}.
+ *
+ * @return {@code true} if the time was bumped, {@code false} otherwise (i.e., it is
+ * not a periodic task)
+ */
+ public boolean bumpNextTime() {
+ return false;
+ }
+
+ /**
+ * Bumps {@link #nextMs}, setting it to the current time plus the given delay.
+ *
+ * @param delayMs time, in milliseconds, before this item should be (re-)executed
+ */
+ protected void bumpNextTime(long delayMs) {
+ if (delayMs < 0) {
+ throw new IllegalArgumentException("negative delay");
+ }
+
+ // always bump by at least 1 millisecond
+ this.nextMs = currentTime.getMillis() + Math.max(1, delayMs);
+ }
+
+ /**
+ * Interrupts the thread that created the work item, if appropriate. The default
+ * method does nothing.
+ */
+ public void interrupt() {
+ // do nothing
+ }
+
+ /**
+ * Determines if this item is associated with the given object. The default method
+ * simply returns {@code false}.
+ *
+ * @param associate candidate associate (e.g., Timer)
+ * @return {@code true} if the item is associated with the given object, {@code false}
+ * otherwise
+ */
+ public boolean isAssociatedWith(Object associate) {
+ return false;
+ }
+
+ /**
+ * Fires/executes this item. The default method does nothing.
+ */
+ public void fire() {
+ // do nothing
+ }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/PeriodicItemTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/PeriodicItemTest.java
new file mode 100644
index 00000000..3e64edf3
--- /dev/null
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/PeriodicItemTest.java
@@ -0,0 +1,78 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class PeriodicItemTest {
+ private static final long DELAY_MS = 100L;
+ private static final long PERIOD_MS = 200L;
+ private static final Object ASSOCIATE = new Object();
+
+ private TestTime currentTime;
+ private int count;
+ private PeriodicItem item;
+
+ /**
+ * Sets up objects, including {@link #item}.
+ */
+ @Before
+ public void setUp() {
+ currentTime = new TestTime();
+ count = 0;
+ item = new PeriodicItem(currentTime, ASSOCIATE, DELAY_MS, PERIOD_MS, () -> count++);
+ }
+
+ @Test
+ public void testBumpNextTime() {
+ assertTrue(item.bumpNextTime());
+ assertEquals(currentTime.getMillis() + PERIOD_MS, item.getNextMs());
+ }
+
+ @Test
+ public void testToString() {
+ assertNotNull(item.toString());
+ }
+
+ @Test
+ public void testPeriodicItem() {
+ assertSame(ASSOCIATE, item.getAssociate());
+ assertNotNull(item.getAction());
+ assertEquals(currentTime.getMillis() + DELAY_MS, item.getNextMs());
+
+ item.getAction().run();
+ assertEquals(1, count);
+
+ // invalid period
+ assertThatIllegalArgumentException()
+ .isThrownBy(() -> new PeriodicItem(currentTime, ASSOCIATE, DELAY_MS, 0, () -> count++));
+ assertThatIllegalArgumentException()
+ .isThrownBy(() -> new PeriodicItem(currentTime, ASSOCIATE, DELAY_MS, -1, () -> count++));
+ }
+
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorServiceTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorServiceTest.java
new file mode 100644
index 00000000..70820c44
--- /dev/null
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledExecutorServiceTest.java
@@ -0,0 +1,267 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PseudoScheduledExecutorServiceTest {
+ private static final long DELAY_MS = 100L;
+ private static final long PERIOD_MS = 200L;
+
+ private int ran;
+ private int called;
+ private TestTimeMulti currentTime;
+ private PseudoScheduledExecutorService svc;
+
+ /**
+ * Sets up objects, including {@link #svc}.
+ */
+ @Before
+ public void setUp() {
+ ran = 0;
+ called = 0;
+ currentTime = new TestTimeMulti();
+ svc = new PseudoScheduledExecutorService(currentTime);
+ }
+
+ @Test
+ public void testShutdown() {
+ // submit some tasks
+ svc.submit(new MyRun());
+ svc.schedule(new MyRun(), 1L, TimeUnit.SECONDS);
+
+ svc.shutdown();
+ assertTrue(svc.isShutdown());
+
+ // task should have been removed
+ assertTrue(currentTime.isEmpty());
+ }
+
+ @Test
+ public void testShutdownNow() {
+ // submit some tasks
+ svc.submit(new MyRun());
+ svc.schedule(new MyRun(), 1L, TimeUnit.SECONDS);
+
+ svc.shutdownNow();
+ assertTrue(svc.isShutdown());
+
+ // task should have been removed
+ assertTrue(currentTime.isEmpty());
+ }
+
+ @Test
+ public void testIsShutdown_testIsTerminated() {
+ assertFalse(svc.isShutdown());
+ assertFalse(svc.isTerminated());
+
+ svc.shutdown();
+ assertTrue(svc.isShutdown());
+ assertTrue(svc.isTerminated());
+ }
+
+ @Test
+ public void testAwaitTermination() throws InterruptedException {
+ assertFalse(svc.awaitTermination(1L, TimeUnit.SECONDS));
+
+ svc.shutdown();
+ assertTrue(svc.awaitTermination(1L, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testSubmitCallableOfT() throws Exception {
+ Future<Integer> future = svc.submit(new MyCallable());
+ currentTime.runOneTask(0);
+
+ assertEquals(1, called);
+ assertEquals(1, future.get().intValue());
+
+ // nothing re-queued
+ assertTrue(currentTime.isEmpty());
+ }
+
+ @Test
+ public void testSubmitRunnableT() throws Exception {
+ Future<Integer> future = svc.submit(new MyRun(), 2);
+ currentTime.runOneTask(0);
+
+ assertEquals(1, ran);
+ assertEquals(2, future.get().intValue());
+
+ // nothing re-queued
+ assertTrue(currentTime.isEmpty());
+ }
+
+ @Test
+ public void testSubmitRunnable() throws Exception {
+ assertNotNull(svc.submit(new MyRun()));
+ currentTime.runOneTask(0);
+
+ assertEquals(1, ran);
+
+ // nothing re-queued
+ assertTrue(currentTime.isEmpty());
+ }
+
+ @Test
+ public void testInvokeAllCollectionOfQextendsCallableOfT() {
+ assertThatThrownBy(() -> svc.invokeAll(Collections.emptyList()))
+ .isInstanceOf(UnsupportedOperationException.class);
+ }
+
+ @Test
+ public void testInvokeAllCollectionOfQextendsCallableOfTLongTimeUnit() {
+ assertThatThrownBy(() -> svc.invokeAll(Collections.emptyList(), 1, TimeUnit.MILLISECONDS))
+ .isInstanceOf(UnsupportedOperationException.class);
+ }
+
+ @Test
+ public void testInvokeAnyCollectionOfQextendsCallableOfT() {
+ assertThatThrownBy(() -> svc.invokeAny(Collections.emptyList()))
+ .isInstanceOf(UnsupportedOperationException.class);
+ }
+
+ @Test
+ public void testInvokeAnyCollectionOfQextendsCallableOfTLongTimeUnit() {
+ assertThatThrownBy(() -> svc.invokeAny(Collections.emptyList(), 1, TimeUnit.MILLISECONDS))
+ .isInstanceOf(UnsupportedOperationException.class);
+ }
+
+ @Test
+ public void testExecute() throws InterruptedException {
+ svc.execute(new MyRun());
+ currentTime.runOneTask(0);
+
+ assertEquals(1, ran);
+
+ // nothing re-queued
+ assertTrue(currentTime.isEmpty());
+ }
+
+ @Test
+ public void testScheduleRunnableLongTimeUnit() throws InterruptedException {
+ assertNotNull(svc.schedule(new MyRun(), DELAY_MS, TimeUnit.MILLISECONDS));
+
+ assertEquals(DELAY_MS, oneTaskElapsedTime());
+ assertEquals(1, ran);
+
+ // verify nothing re-scheduled
+ assertTrue(currentTime.isEmpty());
+ }
+
+ @Test
+ public void testScheduleCallableOfVLongTimeUnit() throws Exception {
+ ScheduledFuture<Integer> future = svc.schedule(new MyCallable(), DELAY_MS, TimeUnit.MILLISECONDS);
+
+ assertEquals(DELAY_MS, oneTaskElapsedTime());
+ assertEquals(1, called);
+ assertEquals(1, future.get().intValue());
+
+ // verify nothing re-scheduled
+ assertTrue(currentTime.isEmpty());
+ }
+
+ @Test
+ public void testScheduleAtFixedRate() throws InterruptedException {
+ final ScheduledFuture<?> future =
+ svc.scheduleAtFixedRate(new MyRun(), DELAY_MS, PERIOD_MS, TimeUnit.MILLISECONDS);
+
+ assertEquals(DELAY_MS, oneTaskElapsedTime());
+ assertEquals(1, ran);
+
+ assertEquals(PERIOD_MS, oneTaskElapsedTime());
+ assertEquals(2, ran);
+
+ assertEquals(PERIOD_MS, oneTaskElapsedTime());
+ assertEquals(3, ran);
+
+ future.cancel(false);
+
+ // should not actually execute
+ assertEquals(0, oneTaskElapsedTime());
+ assertEquals(3, ran);
+
+ // verify nothing re-scheduled
+ assertTrue(currentTime.isEmpty());
+ }
+
+ @Test
+ public void testScheduleWithFixedDelay() throws InterruptedException {
+ final ScheduledFuture<?> future =
+ svc.scheduleWithFixedDelay(new MyRun(), DELAY_MS, PERIOD_MS, TimeUnit.MILLISECONDS);
+
+ assertEquals(DELAY_MS, oneTaskElapsedTime());
+ assertEquals(1, ran);
+
+ assertEquals(PERIOD_MS, oneTaskElapsedTime());
+ assertEquals(2, ran);
+
+ assertEquals(PERIOD_MS, oneTaskElapsedTime());
+ assertEquals(3, ran);
+
+ future.cancel(false);
+
+ // should not actually execute
+ assertEquals(0, oneTaskElapsedTime());
+ assertEquals(3, ran);
+
+ // verify nothing re-scheduled
+ assertTrue(currentTime.isEmpty());
+ }
+
+ /**
+ * Runs a single task and returns its elapsed (pseudo) time.
+ *
+ * @return the elapsed time taken to run the task
+ * @throws InterruptedException if the thread is interrupted
+ */
+ private long oneTaskElapsedTime() throws InterruptedException {
+ final long tbegin = currentTime.getMillis();
+ currentTime.runOneTask(0);
+ return (currentTime.getMillis() - tbegin);
+ }
+
+ private class MyRun implements Runnable {
+ @Override
+ public void run() {
+ ++ran;
+ }
+ }
+
+ private class MyCallable implements Callable<Integer> {
+ @Override
+ public Integer call() {
+ return ++called;
+ }
+ }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledFutureTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledFutureTest.java
new file mode 100644
index 00000000..e23bbd29
--- /dev/null
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoScheduledFutureTest.java
@@ -0,0 +1,145 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class PseudoScheduledFutureTest {
+ private static final long DELAY_MS = 1000L;
+
+ private int count;
+
+ @Mock
+ private WorkItem work;
+
+ private PseudoScheduledFuture<Integer> future;
+
+ /**
+ * Sets up objects, including {@link #future}.
+ */
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ when(work.getDelay()).thenReturn(DELAY_MS);
+
+ count = 0;
+ future = new PseudoScheduledFuture<>(() -> ++count, true);
+ future.setWorkItem(work);
+ }
+
+ @Test
+ public void testRun() {
+ // verify with a periodic task - should execute twice
+ count = 0;
+ future.run();
+ future.run();
+ assertEquals(2, count);
+
+ // verify with an aperiodic task - should only execute once
+ future = new PseudoScheduledFuture<>(() -> ++count, false);
+ count = 0;
+ future.run();
+ future.run();
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testPseudoScheduledFutureRunnableTBoolean() throws Exception {
+ final Integer result = 100;
+ future = new PseudoScheduledFuture<>(() -> ++count, result, true);
+ assertTrue(future.isPeriodic());
+ future.run();
+ future.run();
+ assertEquals(2, count);
+
+ // verify with aperiodic constructor
+ future = new PseudoScheduledFuture<>(() -> ++count, result, false);
+ count = 0;
+ assertFalse(future.isPeriodic());
+ future.run();
+ future.run();
+ assertEquals(1, count);
+ assertEquals(result, future.get());
+ }
+
+ @Test
+ public void testPseudoScheduledFutureCallableOfTBoolean() throws Exception {
+ assertTrue(future.isPeriodic());
+ future.run();
+ future.run();
+ assertEquals(2, count);
+
+ // verify with aperiodic constructor
+ future = new PseudoScheduledFuture<>(() -> ++count, false);
+ count = 0;
+ assertFalse(future.isPeriodic());
+ future.run();
+ assertEquals(1, future.get().intValue());
+ future.run();
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testGetDelay() {
+ assertEquals(DELAY_MS, future.getDelay(TimeUnit.MILLISECONDS));
+ assertEquals(TimeUnit.MILLISECONDS.toSeconds(DELAY_MS), future.getDelay(TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testCompareTo() {
+ Delayed delayed = mock(Delayed.class);
+ when(delayed.getDelay(TimeUnit.MILLISECONDS)).thenReturn(DELAY_MS + 1);
+
+ assertTrue(future.compareTo(delayed) < 0);
+ }
+
+ @Test
+ public void testIsPeriodic() {
+ assertTrue(future.isPeriodic());
+ assertFalse(new PseudoScheduledFuture<>(() -> ++count, false).isPeriodic());
+ }
+
+ @Test
+ public void testGetWorkItem() {
+ assertSame(work, future.getWorkItem());
+ }
+
+ @Test
+ public void testSetWorkItem() {
+ work = mock(WorkItem.class);
+ future.setWorkItem(work);
+ assertSame(work, future.getWorkItem());
+ }
+
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoTimerTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoTimerTest.java
new file mode 100644
index 00000000..49710538
--- /dev/null
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoTimerTest.java
@@ -0,0 +1,141 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Date;
+import java.util.TimerTask;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PseudoTimerTest {
+ private static final long DELAY_MS = 1000L;
+ private static final long PERIOD_MS = 2000L;
+
+ private int count;
+ private TestTimeMulti currentTime;
+ private PseudoTimer timer;
+
+ /**
+ * Sets up objects, including {@link #timer}.
+ */
+ @Before
+ public void setUp() {
+ count = 0;
+ currentTime = new TestTimeMulti();
+ timer = new PseudoTimer(currentTime);
+ }
+
+ @Test
+ public void testCancel() {
+ // schedule two tasks
+ timer.scheduleAtFixedRate(new MyTask(), DELAY_MS, PERIOD_MS);
+ timer.schedule(new MyTask(), DELAY_MS);
+
+ assertFalse(currentTime.isEmpty());
+
+ // cancel the timer
+ timer.cancel();
+
+ // invoke it again to ensure no exception
+ timer.cancel();
+ }
+
+ @Test
+ public void testPurge() {
+ assertEquals(0, timer.purge());
+ assertEquals(0, timer.purge());
+ }
+
+ @Test
+ public void testScheduleTimerTaskLong() throws InterruptedException {
+ timer.schedule(new MyTask(), DELAY_MS);
+ assertFalse(currentTime.isEmpty());
+
+ // wait for the initial delay
+ currentTime.waitFor(DELAY_MS);
+ assertEquals(1, count);
+
+ assertTrue(currentTime.isEmpty());
+ }
+
+ @Test
+ public void testScheduleTimerTaskDate() {
+ assertThatThrownBy(() -> timer.schedule(new MyTask(), new Date()))
+ .isInstanceOf(UnsupportedOperationException.class);
+ }
+
+ @Test
+ public void testScheduleTimerTaskLongLong() throws InterruptedException {
+ timer.schedule(new MyTask(), DELAY_MS, PERIOD_MS);
+ assertFalse(currentTime.isEmpty());
+
+ // wait for the initial delay plus a couple of additional periods
+ final long tbegin = System.currentTimeMillis();
+ currentTime.waitFor(DELAY_MS + PERIOD_MS * 2);
+ assertTrue(count >= 3);
+
+ assertFalse(currentTime.isEmpty());
+
+ // this thread should not have blocked while waiting
+ assertTrue(System.currentTimeMillis() < tbegin + 2000);
+ }
+
+ @Test
+ public void testScheduleTimerTaskDateLong() {
+ assertThatThrownBy(() -> timer.schedule(new MyTask(), new Date(), 1L))
+ .isInstanceOf(UnsupportedOperationException.class);
+ }
+
+ @Test
+ public void testScheduleAtFixedRateTimerTaskLongLong() throws InterruptedException {
+ timer.scheduleAtFixedRate(new MyTask(), DELAY_MS, PERIOD_MS);
+ assertFalse(currentTime.isEmpty());
+
+ // wait for the initial delay plus a couple of additional periods
+ final long tbegin = System.currentTimeMillis();
+ currentTime.waitFor(DELAY_MS + PERIOD_MS * 2);
+ assertTrue(count >= 3);
+
+ assertFalse(currentTime.isEmpty());
+
+ // this thread should not have blocked while waiting
+ assertTrue(System.currentTimeMillis() < tbegin + 2000);
+ }
+
+ @Test
+ public void testScheduleAtFixedRateTimerTaskDateLong() {
+ assertThatThrownBy(() -> timer.scheduleAtFixedRate(new MyTask(), new Date(), 1L))
+ .isInstanceOf(UnsupportedOperationException.class);
+ }
+
+ private class MyTask extends TimerTask {
+ @Override
+ public void run() {
+ ++count;
+ }
+
+ }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/RunnableItemTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/RunnableItemTest.java
new file mode 100644
index 00000000..e7bbd018
--- /dev/null
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/RunnableItemTest.java
@@ -0,0 +1,102 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.FutureTask;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RunnableItemTest {
+ private static final long DELAY_MS = 100L;
+ private static final Object ASSOCIATE = new Object();
+
+ private TestTime currentTime;
+ private int count;
+ private RunnableItem item;
+
+ /**
+ * Sets up objects, including {@link #item}.
+ */
+ @Before
+ public void setUp() {
+ currentTime = new TestTime();
+ count = 0;
+ item = new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, () -> count++);
+ }
+
+ @Test
+ public void testWasCancelled() {
+ assertFalse(item.wasCancelled());
+
+ FutureTask<Object> future = new FutureTask<>(() -> count++);
+ item = new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, future);
+ assertFalse(item.wasCancelled());
+
+ future.cancel(true);
+ assertTrue(item.wasCancelled());
+ }
+
+ @Test
+ public void testIsAssociatedWith() {
+ assertFalse(item.isAssociatedWith(this));
+ assertTrue(item.isAssociatedWith(ASSOCIATE));
+ }
+
+ @Test
+ public void testFire() {
+ item.fire();
+ assertEquals(1, count);
+
+ // verify that fire() works even if the action throws an exception
+ new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, () -> {
+ throw new RuntimeException("expected exception");
+ }).fire();
+ }
+
+ @Test
+ public void testRunnableItem_testGetAssociate_testGetAction() {
+ assertSame(ASSOCIATE, item.getAssociate());
+ assertNotNull(item.getAction());
+ assertEquals(currentTime.getMillis() + DELAY_MS, item.getNextMs());
+
+ item.getAction().run();
+ assertEquals(1, count);
+
+ // verify that work item is set when constructed with a future
+ PseudoScheduledFuture<Integer> schedFuture = new PseudoScheduledFuture<>(() -> count + 1, false);
+ item = new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, schedFuture);
+ assertSame(item, schedFuture.getWorkItem());
+
+ // verify that work item is NOT set when constructed with a plain future
+ item = new RunnableItem(currentTime, ASSOCIATE, DELAY_MS, new FutureTask<>(() -> count + 1));
+ }
+
+ @Test
+ public void testToString() {
+ assertNotNull(item.toString());
+ }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/SleepItemTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/SleepItemTest.java
new file mode 100644
index 00000000..dbd54781
--- /dev/null
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/SleepItemTest.java
@@ -0,0 +1,112 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SleepItemTest {
+ private static final int SLEEP_MS = 250;
+ private static final long MAX_WAIT_MS = 5000L;
+
+ private TestTime currentTime;
+ private Thread thread;
+ private CountDownLatch started;
+ private CountDownLatch finished;
+ private volatile InterruptedException threadEx;
+ private SleepItem item;
+
+ /**
+ * Sets up objects, including {@link #item}.
+ */
+ @Before
+ public void setUp() {
+ currentTime = new TestTime();
+ started = new CountDownLatch(1);
+ finished = new CountDownLatch(1);
+
+ thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ started.countDown();
+ item.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ threadEx = e;
+ }
+
+ finished.countDown();
+ }
+ };
+ thread.setDaemon(true);
+
+ item = new SleepItem(currentTime, SLEEP_MS, thread);
+ }
+
+ @Test
+ public void testInterrupt() throws InterruptedException {
+ startThread();
+
+ item.interrupt();
+
+ assertTrue(finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS));
+ assertNotNull(threadEx);
+ }
+
+ @Test
+ public void testFire_testAwait() throws InterruptedException {
+ startThread();
+
+ // verify that it hasn't finished yet
+ thread.join(250);
+ assertTrue(finished.getCount() > 0);
+
+ // now fire it and verify that it finishes
+ item.fire();
+ assertTrue(finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS));
+
+ assertNull(threadEx);
+ }
+
+ @Test
+ public void testSleepItem() {
+ assertEquals(currentTime.getMillis() + SLEEP_MS, item.getNextMs());
+ }
+
+ @Test
+ public void testToString() {
+ assertNotNull(item.toString());
+ }
+
+
+ private void startThread() throws InterruptedException {
+ thread.start();
+ started.await();
+ }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java
index f17235a2..8b6501a7 100644
--- a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java
@@ -1,6 +1,6 @@
-/*
+/*-
* ============LICENSE_START=======================================================
- * Common Utils-Test
+ * ONAP
* ================================================================================
* Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
@@ -20,109 +20,478 @@
package org.onap.policy.common.utils.time;
-import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Before;
import org.junit.Test;
-/**
- * Class to test TestTimeMulti.
- */
public class TestTimeMultiTest {
+ private static final long SHORT_WAIT_MS = 100L;
+ private static final long DELAY_MS = 500L;
+ private static final long MAX_WAIT_MS = 5000L;
- private static final int NTHREADS = 10;
- private static final int NTIMES = 100;
- private static final long WAIT_SEC = 5L;
- private static final long MIN_SLEEP_MS = 5L;
+ private TestTimeMulti multi;
- private TestTimeMulti ttm;
- private Semaphore done;
+ @Before
+ public void setUp() {
+ multi = new TestTimeMulti();
+ }
@Test
- public void test() throws Exception {
- ttm = new TestTimeMulti(NTHREADS);
- done = new Semaphore(0);
+ public void testSleep() throws InterruptedException {
+ // negative sleep time
+ final long tbegin = multi.getMillis();
+ MyThread thread = new MyThread(-5);
+ thread.start();
- final long tbeg = ttm.getMillis();
+ // should complete without creating a work item
+ assertTrue(thread.await());
+ assertNull(thread.ex);
- // create threads
- List<MyThread> threads = new ArrayList<>(NTHREADS);
- for (int x = 0; x < NTHREADS; ++x) {
- threads.add(new MyThread(x + MIN_SLEEP_MS));
- }
+ // time should not have changed
+ assertEquals(tbegin, multi.getMillis());
+
+
+ // positive sleep time
+ thread = new MyThread(DELAY_MS);
+ thread.start();
+
+ // must execute the SleepItem
+ multi.runOneTask(MAX_WAIT_MS);
+
+ assertTrue(multi.isEmpty());
+ assertTrue(thread.await());
+ assertNull(thread.ex);
+
+ // time SHOULD HAVE changed
+ assertEquals(tbegin + DELAY_MS, multi.getMillis());
+ }
+
+ @Test
+ public void testTestTimeMulti() {
+ assertTrue(multi.getMaxWaitMs() > 0);
+ }
+
+ @Test
+ public void testTestTimeMultiLong() {
+ assertEquals(200, new TestTimeMulti(200).getMaxWaitMs());
+ }
+
+ @Test
+ public void testIsEmpty_testQueueLength() throws InterruptedException {
+ assertTrue(multi.isEmpty());
+
+ // queue up two items
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+ assertFalse(multi.isEmpty());
+ assertEquals(1, multi.queueLength());
+
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+ assertEquals(2, multi.queueLength());
+
+ // run one - should not be empty yet
+ multi.runOneTask(0);
+ assertFalse(multi.isEmpty());
+ assertEquals(1, multi.queueLength());
+
+ // run the other - should be empty now
+ multi.runOneTask(0);
+ assertTrue(multi.isEmpty());
+ assertEquals(0, multi.queueLength());
+ }
+
+ @Test
+ public void testDestroy() throws InterruptedException {
+ // this won't interrupt
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+
+ // these will interrupt
+ AtomicBoolean interrupted1 = new AtomicBoolean(false);
+ multi.enqueue(new WorkItem(multi, DELAY_MS) {
+ @Override
+ public void interrupt() {
+ interrupted1.set(true);
+ }
+ });
+
+ AtomicBoolean interrupted2 = new AtomicBoolean(false);
+ multi.enqueue(new WorkItem(multi, DELAY_MS) {
+ @Override
+ public void interrupt() {
+ interrupted2.set(true);
+ }
+ });
+
+ multi.destroy();
+ assertTrue(multi.isEmpty());
+
+ assertTrue(interrupted1.get());
+ assertTrue(interrupted2.get());
+ }
+
+ @Test
+ public void testRunOneTask() throws InterruptedException {
+ // nothing in the queue yet
+ assertFalse(multi.runOneTask(0));
+
+ // put something in the queue
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+
+ final long tbegin = multi.getMillis();
+ assertTrue(multi.runOneTask(MAX_WAIT_MS));
+
+ assertEquals(tbegin + DELAY_MS, multi.getMillis());
+
+ // nothing in the queue now
+ assertFalse(multi.runOneTask(0));
+
+ // time doesn't change
+ assertEquals(tbegin + DELAY_MS, multi.getMillis());
+ }
+
+ @Test
+ public void testWaitFor() throws InterruptedException {
+ // queue up a couple of items
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 2));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 3));
+
+ final long realBegin = System.currentTimeMillis();
+ final long tbegin = multi.getMillis();
+ multi.waitFor(DELAY_MS * 2 - 1);
+ assertEquals(tbegin + DELAY_MS * 2, multi.getMillis());
+
+ // minimal real time should have elapsed
+ assertTrue(System.currentTimeMillis() < realBegin + TestTimeMulti.DEFAULT_MAX_WAIT_MS);
+ }
+
+ @Test
+ public void testWaitFor_EmptyQueue() throws InterruptedException {
+ multi = new TestTimeMulti(SHORT_WAIT_MS);
+
+ final long realBegin = System.currentTimeMillis();
+ final long tbegin = multi.getMillis();
+
+ multi.waitFor(2);
+
+ assertEquals(tbegin + 2, multi.getMillis());
+ assertTrue(System.currentTimeMillis() >= realBegin + SHORT_WAIT_MS);
+ }
+
+ @Test
+ public void testWaitUntilCallable() throws InterruptedException {
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 2));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 3));
+
+ final long tbegin = multi.getMillis();
+ AtomicInteger count = new AtomicInteger(0);
+ multi.waitUntil(() -> count.incrementAndGet() == 3);
+
+ assertEquals(tbegin + DELAY_MS * 2, multi.getMillis());
+
+ // should still be one item left in the queue
+ assertEquals(1, multi.queueLength());
+ assertEquals(3, count.get());
+ }
+
+ @Test
+ public void testWaitUntilCallable_InterruptEx() throws InterruptedException {
+ multi = new TestTimeMulti();
+
+ Callable<Boolean> callable = () -> {
+ throw new InterruptedException("expected exception");
+ };
+
+ LinkedBlockingQueue<Error> errors = new LinkedBlockingQueue<>();
+
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ multi.waitUntil(callable);
+ } catch (Error ex) {
+ errors.add(ex);
+ }
+ }
+ };
+
+ thread.start();
+
+ Error ex = errors.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+ assertNotNull(ex);
+ assertEquals("interrupted while waiting for condition: expected exception", ex.getMessage());
+ }
+
+ @Test
+ public void testWaitUntilCallable_ConditionThrowsEx() throws InterruptedException {
+ multi = new TestTimeMulti();
+
+ Callable<Boolean> callable = () -> {
+ throw new IllegalStateException("expected exception");
+ };
+
+ final long realBegin = System.currentTimeMillis();
+ assertThatThrownBy(() -> multi.waitUntil(callable))
+ .hasMessage("condition evaluator threw an exception: expected exception");
+
+ assertTrue(System.currentTimeMillis() < realBegin + TestTimeMulti.DEFAULT_MAX_WAIT_MS);
+ }
+
+ @Test
+ public void testWaitUntilCallable_NeverSatisfied() throws InterruptedException {
+ multi = new TestTimeMulti(SHORT_WAIT_MS);
+
+ final long realBegin = System.currentTimeMillis();
+ assertThatThrownBy(() -> multi.waitUntil(() -> false))
+ .hasMessage(TestTimeMulti.NEVER_SATISFIED);
+ assertTrue(System.currentTimeMillis() >= realBegin + SHORT_WAIT_MS);
+ }
- // launch threads
- for (MyThread thr : threads) {
- thr.start();
+ @Test
+ public void testWaitUntilLongTimeUnitCallable() throws InterruptedException {
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 2));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 3));
+
+ final long tbegin = multi.getMillis();
+ AtomicInteger count = new AtomicInteger(0);
+ multi.waitUntil(DELAY_MS * 4, TimeUnit.MILLISECONDS, () -> count.incrementAndGet() == 3);
+
+ assertEquals(tbegin + DELAY_MS * 2, multi.getMillis());
+
+ // should still be one item left in the queue
+ assertEquals(1, multi.queueLength());
+ assertEquals(3, count.get());
+ }
+
+ @Test
+ public void testWaitUntilLongTimeUnitCallable_PseudoTimeExpires() throws InterruptedException {
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 2));
+ multi.enqueue(new WorkItem(multi, DELAY_MS * 3));
+
+ final long tbegin = multi.getMillis();
+ assertThatThrownBy(() -> multi.waitUntil(DELAY_MS * 2 - 1, TimeUnit.MILLISECONDS, () -> false))
+ .hasMessage(TestTimeMulti.NEVER_SATISFIED);
+ assertEquals(tbegin + DELAY_MS * 2, multi.getMillis());
+ }
+
+ @Test
+ public void testRunItem() throws InterruptedException {
+ AtomicBoolean fired = new AtomicBoolean(false);
+ multi.enqueue(new MyWorkItem(fired));
+
+ assertTrue(multi.runOneTask(1));
+
+ // should no longer be in the queue
+ assertTrue(multi.isEmpty());
+
+ // should have been fired
+ assertTrue(fired.get());
+ }
+
+ @Test
+ public void testRunItem_Rescheduled() throws InterruptedException {
+ AtomicBoolean fired = new AtomicBoolean(false);
+
+ multi.enqueue(new MyWorkItem(fired) {
+ @Override
+ public boolean bumpNextTime() {
+ bumpNextTime(DELAY_MS);
+ return true;
+ }
+ });
+
+ assertTrue(multi.runOneTask(1));
+
+ // should still be in the queue
+ assertEquals(1, multi.queueLength());
+
+ // should have been fired
+ assertTrue(fired.get());
+ }
+
+ @Test
+ public void testRunItem_Canceled() throws InterruptedException {
+ AtomicBoolean fired = new AtomicBoolean(false);
+
+ multi.enqueue(new MyWorkItem(fired) {
+ @Override
+ public boolean wasCancelled() {
+ return true;
+ }
+
+ @Override
+ public boolean bumpNextTime() {
+ return true;
+ }
+ });
+
+ final long tbegin = multi.getMillis();
+ assertTrue(multi.runOneTask(1));
+
+ // time should be unchanged
+ assertEquals(tbegin, multi.getMillis());
+
+ assertTrue(multi.isEmpty());
+
+ // should not have been fired
+ assertFalse(fired.get());
+ }
+
+ @Test
+ public void testEnqueue() throws InterruptedException {
+ CountDownLatch started = new CountDownLatch(1);
+ CountDownLatch finished = new CountDownLatch(1);
+ AtomicReference<InterruptedException> ex = new AtomicReference<>();
+
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ started.countDown();
+
+ try {
+ multi.runOneTask(DELAY_MS * 3);
+ } catch (InterruptedException e) {
+ ex.set(e);
+ }
+
+ finished.countDown();
+ }
+ };
+
+ thread.start();
+
+ // wait for thread to start
+ started.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+
+ // wait for it to block on the lock
+ await().atMost(MAX_WAIT_MS, TimeUnit.MILLISECONDS).until(() -> thread.getState() == Thread.State.TIMED_WAITING);
+
+ // add an item to the queue - should trigger the thread to continue
+ multi.enqueue(new WorkItem(multi, DELAY_MS));
+
+ assertTrue(finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS));
+ assertNull(ex.get());
+ }
+
+ @Test
+ public void testCancelItems() throws InterruptedException {
+ AtomicBoolean fired1 = new AtomicBoolean();
+ multi.enqueue(new MyWorkItem(fired1));
+
+ AtomicBoolean fired2 = new AtomicBoolean();
+ multi.enqueue(new MyWorkItem(fired2));
+ multi.enqueue(new MyWorkItem(fired2));
+
+ AtomicBoolean fired3 = new AtomicBoolean();
+ multi.enqueue(new MyWorkItem(fired3));
+
+ // cancel some
+ multi.cancelItems(fired2);
+
+ // should have only canceled two of them
+ assertEquals(2, multi.queueLength());
+
+ // fire both
+ multi.runOneTask(0);
+ multi.runOneTask(0);
+
+ // these should have fired
+ assertTrue(fired1.get());
+ assertTrue(fired3.get());
+
+ // these should NOT have fired
+ assertFalse(fired2.get());
+ }
+
+ @Test
+ public void testPurgeItems() throws InterruptedException {
+ AtomicBoolean fired = new AtomicBoolean();
+
+ // queue up two that are canceled, one that is not
+ multi.enqueue(new MyWorkItem(true));
+ multi.enqueue(new MyWorkItem(fired));
+ multi.enqueue(new MyWorkItem(true));
+
+ multi.purgeItems();
+
+ assertEquals(1, multi.queueLength());
+
+ multi.runOneTask(0);
+ assertTrue(fired.get());
+ }
+
+ private class MyWorkItem extends WorkItem {
+ private final AtomicBoolean fired;
+ private final boolean canceled;
+
+ public MyWorkItem(AtomicBoolean fired) {
+ super(multi, DELAY_MS);
+ this.fired = fired;
+ this.canceled = false;
}
- // wait for each one to complete
- for (MyThread thr : threads) {
- assertTrue("complete " + thr.getSleepMs(), done.tryAcquire(WAIT_SEC, TimeUnit.SECONDS));
- ttm.threadCompleted();
+ public MyWorkItem(boolean canceled) {
+ super(multi, DELAY_MS);
+ this.fired = new AtomicBoolean();
+ this.canceled = canceled;
}
- // check results
- for (MyThread thr : threads) {
- assertEquals("time " + thr.getSleepMs(), thr.texpected, thr.tactual);
+ @Override
+ public void fire() {
+ fired.set(true);
}
- assertTrue(ttm.getMillis() >= tbeg + NTIMES * MIN_SLEEP_MS);
+ @Override
+ public boolean isAssociatedWith(Object associate) {
+ return (fired == associate);
+ }
- // something in the queue, but no threads remain -> exception
- assertThatIllegalStateException().isThrownBy(() -> ttm.threadCompleted());
+ @Override
+ public boolean wasCancelled() {
+ return canceled;
+ }
}
private class MyThread extends Thread {
-
private final long sleepMs;
-
- private volatile long texpected;
- private volatile long tactual;
+ private final CountDownLatch finished = new CountDownLatch(1);
+ private InterruptedException ex = null;
public MyThread(long sleepMs) {
this.sleepMs = sleepMs;
-
this.setDaemon(true);
}
- public long getSleepMs() {
- return sleepMs;
+ public boolean await() throws InterruptedException {
+ return finished.await(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
}
@Override
public void run() {
try {
- for (int x = 0; x < NTIMES; ++x) {
- // negative sleep should have no effect
- texpected = ttm.getMillis();
- ttm.sleep(-1);
- if ((tactual = ttm.getMillis()) != texpected) {
- break;
- }
-
- texpected = ttm.getMillis() + sleepMs;
- ttm.sleep(sleepMs);
-
- if ((tactual = ttm.getMillis()) != texpected) {
- break;
- }
-
- if ((tactual = ttm.getDate().getTime()) != texpected) {
- break;
- }
- }
-
- } catch (InterruptedException expected) {
+ multi.sleep(sleepMs);
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ ex = e;
}
- done.release();
+ finished.countDown();
}
}
}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java
index 3e7897e9..d2cf6783 100644
--- a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* Common Utils-Test
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -62,6 +62,11 @@ public class TestTimeTest {
// ensure that no real time has elapsed
assertTrue(System.currentTimeMillis() < treal + tsleep / 2);
+
+ // negative sleep should not modify the time
+ tcur = tm.getMillis();
+ tm.sleep(-1);
+ assertEquals(tcur, tm.getMillis());
}
}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/WorkItemTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/WorkItemTest.java
new file mode 100644
index 00000000..4e6f92b5
--- /dev/null
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/WorkItemTest.java
@@ -0,0 +1,100 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class WorkItemTest {
+ private TestTime currentTime;
+ private WorkItem item;
+
+ @Before
+ public void setUp() {
+ currentTime = new TestTime();
+ item = new WorkItem(currentTime, 0);
+ }
+
+ @Test
+ public void testWorkItem() {
+ assertThatIllegalArgumentException().isThrownBy(() -> new WorkItem(currentTime, -1));
+
+ // should not throw an exception
+ new WorkItem(currentTime, 1);
+ }
+
+ @Test
+ public void testGetDelay() {
+ assertEquals(1, item.getDelay());
+ }
+
+ @Test
+ public void testWasCancelled() {
+ assertFalse(item.wasCancelled());
+ }
+
+ @Test
+ public void testBumpNextTime() {
+ assertFalse(item.bumpNextTime());
+ }
+
+ @Test
+ public void testBumpNextTimeLong() {
+ assertThatIllegalArgumentException().isThrownBy(() -> item.bumpNextTime(-1));
+
+ long cur = currentTime.getMillis();
+ item.bumpNextTime(5);
+ assertEquals(cur + 5, item.getNextMs());
+
+ item.bumpNextTime(0);
+
+ // should bump the time by at least 1
+ assertEquals(cur + 1, item.getNextMs());
+ }
+
+ @Test
+ public void testInterrupt() {
+ item.interrupt();
+ assertFalse(Thread.interrupted());
+ }
+
+ @Test
+ public void testIsAssociatedWith() {
+ assertFalse(item.isAssociatedWith(this));
+ }
+
+ @Test
+ public void testFire() {
+ // ensure no exception is thrown
+ item.fire();
+ }
+
+ @Test
+ public void testGetNextMs() {
+ assertEquals(currentTime.getMillis() + 1, item.getNextMs());
+ assertEquals(currentTime.getMillis() + 10, new WorkItem(currentTime, 10).getNextMs());
+ }
+
+}