From b4e7fb0364672f73b613b3d04eba9535e97d7c65 Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Fri, 15 Jun 2018 09:10:00 -0400 Subject: Add CurrentTime utilities Add some comments. Update license text. Change-Id: I4cee208378c44d9730274aee337feb95f9026add Issue-ID: POLICY-908 Signed-off-by: Jim Hahn --- .../policy/common/utils/time/TestTimeMulti.java | 200 +++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java (limited to 'utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java') diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java new file mode 100644 index 00000000..7a8277c7 --- /dev/null +++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java @@ -0,0 +1,200 @@ +/* + * ============LICENSE_START======================================================= + * Common Utils-Test + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.utils.time; + +import java.util.Date; +import java.util.PriorityQueue; +import java.util.concurrent.CountDownLatch; + +/** + * "Current" time, when running junit tests in multiple threads. This is intended to be + * injected into classes under test, to replace their {@link CurrentTime} objects. The + * {@link #sleep(long)} method blocks until all threads enter and then it moves the notion + * of "current" time forward, allowing threads to resume, as the end of their sleep time + * is reached. Additional threads do not resume until all threads have once again entered + * {@link #sleep(long)} or when {@link #threadCompleted()} is invoked to indicate that a + * thread will not re-enter {@link #sleep(long)}. + */ +public class TestTimeMulti extends CurrentTime { + + /** + * Number of threads that will be sleeping simultaneously. + */ + private int nthreads; + + /** + * "Current" time, in milliseconds, used by tests. + */ + private long tcur = System.currentTimeMillis(); + + /** + * Queue of sleeping threads waiting to be awakened. + */ + private final PriorityQueue queue = new PriorityQueue<>(); + + /** + * Used to synchronize updates. + */ + private final Object locker = new Object(); + + /** + * + * @param nthreads number of threads that will be sleeping simultaneously + */ + public TestTimeMulti(int nthreads) { + this.nthreads = nthreads; + } + + @Override + public long getMillis() { + return tcur; + } + + @Override + public Date getDate() { + return new Date(tcur); + } + + @Override + public void sleep(long sleepMs) throws InterruptedException { + if (sleepMs <= 0) { + return; + } + + Info info = new Info(tcur + sleepMs); + + synchronized (locker) { + queue.add(info); + + if (queue.size() == nthreads) { + // all threads are now sleeping - wake one up + wakeThreads(); + } + } + + // this MUST happen outside of the "synchronized" block + info.await(); + } + + /** + * Indicates that a thread has terminated or that it will no longer be invoking + * {@link #sleep(long)}. Awakens the next sleeping thread, if the queue is full after + * removing the terminated thread. + * + * @throws IllegalStateException if the queue is already full + */ + public void threadCompleted() { + synchronized (locker) { + int sz = queue.size(); + if (sz >= nthreads) { + throw new IllegalStateException("too many threads still sleeping"); + } + + --nthreads; + + if (sz == nthreads) { + // after removing terminated thread - queue is now full; awaken something + wakeThreads(); + } + } + } + + /** + * Advances the "current" time and awakens any threads sleeping until that time. + */ + private void wakeThreads() { + Info info = queue.poll(); + if(info == null) { + return; + } + + tcur = info.getAwakenAtMs(); + info.wake(); + + while ((info = queue.poll()) != null) { + if (tcur == info.getAwakenAtMs()) { + info.wake(); + + } else { + // not ready to wake this thread - put it back in the queue + queue.add(info); + break; + } + } + } + + /** + * Info about a sleeping thread. + */ + private static class Info implements Comparable { + + /** + * Time, in milliseconds, at which the associated thread should awaken. + */ + private final long awakenAtMs; + + /** + * This is triggered when the associated thread should awaken. + */ + private final CountDownLatch latch = new CountDownLatch(1); + + /** + * @param awakenAtMs time, in milliseconds, at which the associated thread should + * awaken + */ + public Info(long awakenAtMs) { + this.awakenAtMs = awakenAtMs; + } + + public long getAwakenAtMs() { + return awakenAtMs; + } + + /** + * Awakens the associated thread by decrementing its latch. + */ + public void wake() { + latch.countDown(); + } + + /** + * Blocks the current thread until awakened (i.e., until its latch is + * decremented). + * + * @throws InterruptedException + */ + public void await() throws InterruptedException { + latch.await(); + } + + @Override + public int compareTo(Info o) { + int diff = Long.compare(awakenAtMs, o.awakenAtMs); + + // this assumes that Object.toString() is unique for each Info object + if (diff == 0) + diff = this.toString().compareTo(o.toString()); + + return diff; + } + + } +} -- cgit 1.2.3-korg