summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java80
-rw-r--r--integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java49
-rw-r--r--integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java103
-rw-r--r--integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java100
-rw-r--r--integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java47
-rw-r--r--integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java13
-rw-r--r--utils-test/pom.xml5
-rw-r--r--utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java59
-rw-r--r--utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java200
-rw-r--r--utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java116
-rw-r--r--utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java66
-rw-r--r--utils/src/main/java/org/onap/policy/common/utils/time/CurrentTime.java61
-rw-r--r--utils/src/test/java/org/onap/policy/common/utils/time/CurrentTimeTest.java59
13 files changed, 774 insertions, 184 deletions
diff --git a/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java b/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
index f1839b12..58ed8b99 100644
--- a/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
+++ b/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
@@ -25,10 +25,7 @@ import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-
import org.onap.policy.common.ia.jpa.IntegrityAuditEntity;
import org.onap.policy.common.logging.eelf.MessageCodes;
import org.onap.policy.common.logging.flexlogger.FlexLogger;
@@ -108,18 +105,6 @@ public class AuditThread extends Thread {
private IntegrityAudit integrityAudit;
/**
- * A latch is taken from this queue before starting an audit. May be {@code null}. Used by JUnit
- * tests.
- */
- private BlockingQueue<CountDownLatch> auditLatchQueue;
-
- /**
- * Latch to be decremented when the next audit completes. May be {@code null}. Used by JUnit
- * tests to wait for an audit to complete.
- */
- private CountDownLatch auditCompletionLatch = null;
-
- /**
* AuditThread constructor.
*
* @param resourceName the resource name
@@ -133,7 +118,7 @@ public class AuditThread extends Thread {
int integrityAuditPeriodSeconds, IntegrityAudit integrityAudit) throws IntegrityAuditException {
this(resourceName, persistenceUnit, properties, TimeUnit.SECONDS.toMillis(integrityAuditPeriodSeconds),
- integrityAudit, null);
+ integrityAudit);
}
/**
@@ -148,13 +133,12 @@ public class AuditThread extends Thread {
* @throws IntegrityAuditException if an error occurs
*/
public AuditThread(String resourceName, String persistenceUnit, Properties properties, long integrityAuditMillis,
- IntegrityAudit integrityAudit, BlockingQueue<CountDownLatch> queue) throws IntegrityAuditException {
+ IntegrityAudit integrityAudit) throws IntegrityAuditException {
this.resourceName = resourceName;
this.persistenceUnit = persistenceUnit;
this.properties = properties;
this.integrityAuditPeriodMillis = integrityAuditMillis;
this.integrityAudit = integrityAudit;
- this.auditLatchQueue = queue;
/*
* The DbDAO Constructor registers this node in the IntegrityAuditEntity table. Each
@@ -174,14 +158,8 @@ public class AuditThread extends Thread {
logger.info("AuditThread.run: Entering");
try {
- /*
- * For JUnit testing: wait for the first latch, decrement it to indicate that the thread
- * has started, and then wait for the next latch, before we actually start doing
- * anything. These simply return if there is no latch queue defined.
- */
- getNextLatch();
- decrementLatch();
- getNextLatch();
+ // for junit testing
+ runStarted();
/*
* Triggers change in designation, unless no other viable candidate.
@@ -284,11 +262,8 @@ public class AuditThread extends Thread {
* property, otherwise just sleep the normal interval.
*/
if (auditCompleted) {
- // indicate that an audit has completed
- decrementLatch();
-
- // don't start the next audit cycle until a latch has been provided
- getNextLatch();
+ // for junit testing: indicate that an audit has completed
+ auditCompleted();
if (logger.isDebugEnabled()) {
logger.debug("AuditThread.run: Audit completed; resourceName=" + this.resourceName
@@ -342,29 +317,6 @@ public class AuditThread extends Thread {
}
/**
- * Gets the next audit-completion latch from the queue. Blocks, if the queue is empty.
- *
- * @throws InterruptedException if interrupted while waiting
- */
- private void getNextLatch() throws InterruptedException {
- BlockingQueue<CountDownLatch> queue = this.auditLatchQueue;
- if (queue != null) {
- this.auditCompletionLatch = queue.take();
- }
- }
-
- /**
- * Decrements the current audit-completion latch, if any.
- */
- private void decrementLatch() {
- CountDownLatch latch = this.auditCompletionLatch;
- if (latch != null) {
- this.auditCompletionLatch = null;
- latch.countDown();
- }
- }
-
- /**
* Determines if an exception is an InterruptedException or was caused by an
* InterruptedException.
*
@@ -788,6 +740,26 @@ public class AuditThread extends Thread {
}
/**
+ * Indicates that the {@link #run()} method has started. This method simply returns,
+ * and may overridden by junit tests.
+ *
+ * @throws InterruptedException
+ */
+ public void runStarted() throws InterruptedException {
+ // does nothing
+ }
+
+ /**
+ * Indicates that an audit has completed. This method simply returns, and may
+ * overridden by junit tests.
+ *
+ * @throws InterruptedException
+ */
+ public void auditCompleted() throws InterruptedException {
+ // does nothing
+ }
+
+ /**
* Adjusts the thread-sleep-interval to be used when an audit has <i>not</i> been completed.
* Used by JUnit tests.
*
diff --git a/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java b/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java
index 9abdbe52..b3330faf 100644
--- a/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java
+++ b/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java
@@ -21,9 +21,6 @@
package org.onap.policy.common.ia;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-
import org.onap.policy.common.ia.IntegrityAuditProperties.NodeTypeEnum;
import org.onap.policy.common.logging.flexlogger.FlexLogger;
import org.onap.policy.common.logging.flexlogger.Logger;
@@ -217,37 +214,20 @@ public class IntegrityAudit {
* @throws IntegrityAuditException if an error occurs
*/
public void startAuditThread() throws IntegrityAuditException {
- startAuditThread(null);
- }
-
- /**
- * Starts the audit thread.
- *
- * @param queue the queue
- * @return {@code true} if the thread was started, {@code false} otherwise
- * @throws IntegrityAuditException if an error occurs
- */
- protected boolean startAuditThread(BlockingQueue<CountDownLatch> queue) throws IntegrityAuditException {
-
logger.info("startAuditThread: Entering");
- boolean success = false;
-
if (integrityAuditPeriodMillis >= 0) {
- this.auditThread = new AuditThread(this.resourceName, this.persistenceUnit, this.properties,
- integrityAuditPeriodMillis, this, queue);
+ this.auditThread = makeAuditThread(this.resourceName, this.persistenceUnit, this.properties, integrityAuditPeriodMillis);
logger.info("startAuditThread: Audit started and will run every " + integrityAuditPeriodMillis / 1000
+ " seconds");
this.auditThread.start();
- success = true;
+
} else {
logger.info("startAuditThread: Suppressing integrity audit, integrityAuditPeriodSeconds="
+ integrityAuditPeriodMillis / 1000);
}
logger.info("startAuditThread: Exiting");
-
- return success;
}
/**
@@ -299,4 +279,29 @@ public class IntegrityAudit {
return !this.auditThread.isAlive();
}
}
+
+ /**
+ *
+ * @return {@code true} if an audit thread exists, {@code false} otherwise
+ */
+ protected boolean haveAuditThread() {
+ return (this.auditThread != null);
+ }
+
+ /**
+ * Creates an audit thread. May be overridden by junit tests.
+ *
+ * @param resourceName2
+ * @param persistenceUnit2
+ * @param properties2
+ * @param integrityAuditPeriodMillis2
+ *
+ * @return a new audit thread
+ * @throws IntegrityAuditException
+ */
+ protected AuditThread makeAuditThread(String resourceName2, String persistenceUnit2, Properties properties2,
+ long integrityAuditPeriodMillis2) throws IntegrityAuditException {
+
+ return new AuditThread(resourceName2, persistenceUnit2, properties2, integrityAuditPeriodMillis2, this);
+ }
}
diff --git a/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java b/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java
index afbcc452..c9179908 100644
--- a/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java
+++ b/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java
@@ -22,10 +22,6 @@ package org.onap.policy.common.ia;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.Logger;
-
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
@@ -34,21 +30,19 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.EntityTransaction;
import javax.persistence.Persistence;
-
import org.onap.policy.common.utils.jpa.EntityMgrCloser;
import org.onap.policy.common.utils.jpa.EntityMgrFactoryCloser;
import org.onap.policy.common.utils.jpa.EntityTransCloser;
import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
import org.slf4j.LoggerFactory;
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
/**
* All JUnits are designed to run in the local development environment where they have write
@@ -464,26 +458,26 @@ public class IntegrityAuditTestBase {
protected void runAudit(MyIntegrityAudit... auditors) throws InterruptedException {
// start an audit cycle on each auditor
- List<CountDownLatch> latches = new ArrayList<>(auditors.length);
+ List<Semaphore> semaphores = new ArrayList<>(auditors.length);
for (MyIntegrityAudit p : auditors) {
- latches.add(p.startAudit());
+ semaphores.add(p.startAudit());
}
// wait for each auditor to complete its cycle
- for (CountDownLatch latch : latches) {
- waitLatch(latch);
+ for (Semaphore sem : semaphores) {
+ waitSem(sem);
}
}
/**
- * Waits for a latch to reach zero.
+ * Waits for a semaphore to be released.
*
- * @param latch the latch to wait for
+ * @param sem the semaphore for which to wait
* @throws InterruptedException if the thread is interrupted
* @throws AssertionError if the latch did not reach zero in the allotted time
*/
- protected void waitLatch(CountDownLatch latch) throws InterruptedException {
- assertTrue(latch.await(WAIT_MS, TimeUnit.SECONDS));
+ protected void waitSem(Semaphore sem) throws InterruptedException {
+ assertTrue(sem.tryAcquire(WAIT_MS, TimeUnit.SECONDS));
}
/**
@@ -520,11 +514,16 @@ public class IntegrityAuditTestBase {
* Manages audits by inserting latches into a queue for the AuditThread to count.
*/
protected class MyIntegrityAudit extends IntegrityAudit {
-
+
+ /**
+ * Semaphore on which the audit thread should wait.
+ */
+ private Semaphore auditSem = null;
+
/**
- * Queue from which the AuditThread will take latches.
+ * Semaphore on which the junit management thread should wait.
*/
- private BlockingQueue<CountDownLatch> queue = null;
+ private Semaphore junitSem = null;
/**
* Constructs an auditor and starts the AuditThread.
@@ -550,16 +549,14 @@ public class IntegrityAuditTestBase {
}
/**
- * Triggers an audit by adding a latch to the queue.
+ * Triggers an audit by releasing the audit thread's semaphore.
*
- * @return the latch that was added
+ * @return the semaphore on which to wait
* @throws InterruptedException if the thread is interrupted
*/
- public CountDownLatch startAudit() throws InterruptedException {
- CountDownLatch latch = new CountDownLatch(1);
- queue.add(latch);
-
- return latch;
+ public Semaphore startAudit() throws InterruptedException {
+ auditSem.release();
+ return junitSem;
}
/**
@@ -567,25 +564,23 @@ public class IntegrityAuditTestBase {
*/
@Override
public final void startAuditThread() throws IntegrityAuditException {
- if (queue != null) {
- // queue up a bogus latch, in case a thread is still running
- queue.add(new CountDownLatch(1) {
- @Override
- public void countDown() {
- throw new RuntimeException("auditor has multiple threads");
- }
- });
+ if (auditSem != null) {
+ // release a bunch of semaphores, in case a thread is still running
+ auditSem.release(1000);
}
+
+ auditSem = new Semaphore(0);
+ junitSem = new Semaphore(0);
+
+ super.startAuditThread();
- queue = new LinkedBlockingQueue<>();
+ if (haveAuditThread()) {
+ // tell the thread it can run
+ auditSem.release();
- if (super.startAuditThread(queue)) {
// wait for the thread to start
- CountDownLatch latch = new CountDownLatch(1);
- queue.add(latch);
-
try {
- waitLatch(latch);
+ waitSem(junitSem);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -605,5 +600,31 @@ public class IntegrityAuditTestBase {
assertTrue(waitThread(this));
}
+
+ @Override
+ protected AuditThread makeAuditThread(String resourceName2, String persistenceUnit2, Properties properties2,
+ long integrityAuditPeriodMillis2) throws IntegrityAuditException {
+
+ return new AuditThread(resourceName2, persistenceUnit2, properties2, integrityAuditPeriodMillis2, this) {
+
+ private Semaphore auditSem = MyIntegrityAudit.this.auditSem;
+ private Semaphore junitSem = MyIntegrityAudit.this.junitSem;
+
+ @Override
+ public void runStarted() throws InterruptedException {
+ auditSem.acquire();
+
+ junitSem.release();
+ auditSem.acquire();
+ }
+
+ @Override
+ public void auditCompleted() throws InterruptedException {
+ junitSem.release();
+ auditSem.acquire();
+ }
+
+ };
+ }
}
}
diff --git a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java
index c32a2213..38dc20dc 100644
--- a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java
+++ b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java
@@ -29,10 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-
import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.persistence.EntityManager;
@@ -43,7 +40,6 @@ import javax.persistence.LockModeType;
import javax.persistence.Persistence;
import javax.persistence.Query;
import javax.validation.constraints.NotNull;
-
import org.onap.policy.common.im.jmx.ComponentAdmin;
import org.onap.policy.common.im.jmx.ComponentAdminMBean;
import org.onap.policy.common.im.jmx.JmxAgentConnection;
@@ -196,7 +192,7 @@ public class IntegrityMonitor {
*/
protected IntegrityMonitor(String resourceName, Properties properties) throws IntegrityMonitorException {
- this(resourceName, properties, null);
+ this(resourceName, properties, new Factory());
}
/**
@@ -207,10 +203,10 @@ public class IntegrityMonitor {
*
* @param resourceName The resource name of the resource
* @param properties a set of properties passed in from the resource
- * @param queue queue to use to control the FPManager thread, or {@code null}
+ * @param factory Factory to use to control the FPManager thread
* @throws IntegrityMonitorException if any errors are encountered in the constructor
*/
- protected IntegrityMonitor(String resourceName, Properties properties, BlockingQueue<CountDownLatch> queue)
+ protected IntegrityMonitor(String resourceName, Properties properties, Factory factory)
throws IntegrityMonitorException {
// singleton check since this constructor can be called from a child or
@@ -357,7 +353,8 @@ public class IntegrityMonitor {
logger.error("ComponentAdmin constructor exception: {}", e.toString(), e);
}
- fpManager = new FpManager(queue);
+ fpManager = new FpManager(factory);
+ fpManager.start();
}
@@ -373,7 +370,7 @@ public class IntegrityMonitor {
*/
public static IntegrityMonitor getInstance(String resourceName, Properties properties)
throws IntegrityMonitorException {
- return getInstance(resourceName, properties, null);
+ return getInstance(resourceName, properties, new Factory());
}
/**
@@ -382,13 +379,13 @@ public class IntegrityMonitor {
*
* @param resourceName The resource name of the resource
* @param properties a set of properties passed in from the resource
- * @param queue queue to use to control the FPManager thread, or {@code null}
+ * @param factory Factory to use to control the FPManager thread
* @return The new instance of IntegrityMonitor
* @throws IntegrityMonitorException if unable to create jmx url or the constructor returns an
* exception
*/
protected static IntegrityMonitor getInstance(String resourceName, Properties properties,
- BlockingQueue<CountDownLatch> queue) throws IntegrityMonitorException {
+ Factory factory) throws IntegrityMonitorException {
synchronized (getInstanceLock) {
logger.debug("getInstance() called - resourceName= {}", resourceName);
@@ -399,7 +396,7 @@ public class IntegrityMonitor {
if (instance == null) {
logger.debug("Creating new instance of IntegrityMonitor");
- instance = new IntegrityMonitor(resourceName, properties, queue);
+ instance = new IntegrityMonitor(resourceName, properties, factory);
}
return instance;
}
@@ -1740,18 +1737,15 @@ public class IntegrityMonitor {
* dependencies, does a refresh state audit and runs the stateAudit.
*/
class FpManager extends Thread {
- private final CountDownLatch stopper = new CountDownLatch(1);
+ private boolean stopRequested = false;
- private BlockingQueue<CountDownLatch> queue;
- private CountDownLatch progressLatch = null;
+ private final Factory factory;
// Constructor - start FP manager thread
- FpManager(BlockingQueue<CountDownLatch> queue) {
- this.queue = queue;
+ FpManager(Factory factory) {
+ this.factory = factory;
// set now as the last time the refreshStateAudit ran
IntegrityMonitor.this.refreshStateAuditLastRunDate = new Date();
- // start thread
- this.start();
}
@Override
@@ -1759,13 +1753,13 @@ public class IntegrityMonitor {
logger.debug("FPManager thread running");
try {
- getLatch();
- decrementLatch();
+ factory.runStarted();
- while (!stopper.await(cycleIntervalMillis, TimeUnit.MILLISECONDS)) {
- getLatch();
+ while(!stopRequested) {
+ factory.doSleep(cycleIntervalMillis);
+
IntegrityMonitor.this.runOnce();
- decrementLatch();
+ factory.monitorCompleted();
}
} catch (InterruptedException e) {
@@ -1775,31 +1769,9 @@ public class IntegrityMonitor {
}
public void stopAndExit() {
- stopper.countDown();
+ stopRequested = true;
this.interrupt();
}
-
- /**
- * Gets the next latch from the queue.
- *
- * @throws InterruptedException
- *
- */
- private void getLatch() throws InterruptedException {
- if (queue != null) {
- progressLatch = queue.take();
- }
- }
-
- /**
- * Decrements the current latch.
- */
- private void decrementLatch() {
- if (progressLatch != null) {
- progressLatch.countDown();
- }
- }
-
}
private void runOnce() {
@@ -1934,6 +1906,40 @@ public class IntegrityMonitor {
return allNotWellMap;
}
+ /**
+ * Used to access various objects. Overridden by junit tests.
+ */
+ public static class Factory {
+
+ /**
+ * Indicates that the {@link FpManager#run()} method has started. This method
+ * simply returns.
+ *
+ * @throws InterruptedException
+ */
+ public void runStarted() throws InterruptedException {
+ // does nothing
+ }
+
+ /**
+ * Sleeps for a period of time.
+ * @param sleepMs amount of time to sleep, in milliseconds
+ * @throws InterruptedException
+ */
+ public void doSleep(long sleepMs) throws InterruptedException {
+ Thread.sleep(sleepMs);
+ }
+
+ /**
+ * Indicates that a monitor activity has completed. This method simply returns.
+ *
+ * @throws InterruptedException
+ */
+ public void monitorCompleted() throws InterruptedException {
+ // does nothing
+ }
+ }
+
/*
* The remaining methods are used by JUnit tests.
*/
diff --git a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java
index 7f1e5516..091dcc91 100644
--- a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java
+++ b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java
@@ -22,23 +22,19 @@ package org.onap.policy.common.im;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-
import java.util.Date;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import java.util.concurrent.Semaphore;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
import javax.persistence.TemporalType;
-
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.onap.policy.common.im.IntegrityMonitor.Factory;
import org.onap.policy.common.im.jpa.ForwardProgressEntity;
import org.onap.policy.common.im.jpa.ResourceRegistrationEntity;
import org.onap.policy.common.im.jpa.StateManagementEntity;
@@ -57,7 +53,8 @@ public class IntegrityMonitorTest extends IntegrityMonitorTestBase {
private static EntityTransaction et;
private static String resourceName;
- private BlockingQueue<CountDownLatch> queue;
+ private Semaphore monitorSem;
+ private Semaphore junitSem;
/**
* Set up for test class.
@@ -900,9 +897,36 @@ public class IntegrityMonitorTest extends IntegrityMonitorTestBase {
private IntegrityMonitor makeMonitor(String resourceName, Properties myProp) throws Exception {
IntegrityMonitor.deleteInstance();
- queue = new LinkedBlockingQueue<>();
+ monitorSem = new Semaphore(0);
+ junitSem = new Semaphore(0);
+
+ Factory factory = new IntegrityMonitor.Factory() {
+
+ @Override
+ public void doSleep(long sleepMs) throws InterruptedException {
+ /*
+ * No need to sleep, as the thread won't progress until the
+ * semaphore is released.
+ */
+ }
+
+ @Override
+ public void runStarted() throws InterruptedException {
+ monitorSem.acquire();
+
+ junitSem.release();
+ monitorSem.acquire();
+ }
+
+ @Override
+ public void monitorCompleted() throws InterruptedException {
+ junitSem.release();
+ monitorSem.acquire();
+ }
+
+ };
- IntegrityMonitor im = IntegrityMonitor.getInstance(resourceName, myProp, queue);
+ IntegrityMonitor im = IntegrityMonitor.getInstance(resourceName, myProp, factory);
// wait for the monitor thread to start
waitStep();
@@ -916,8 +940,7 @@ public class IntegrityMonitorTest extends IntegrityMonitorTestBase {
* @throws InterruptedException if the thread is interrupted
*/
private void waitStep() throws InterruptedException {
- CountDownLatch latch = new CountDownLatch(1);
- queue.offer(latch);
- waitLatch(latch);
+ monitorSem.release();
+ waitSem(junitSem);
}
}
diff --git a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java
index 0c8259b7..e5562306 100644
--- a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java
+++ b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java
@@ -22,17 +22,14 @@ package org.onap.policy.common.im;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
-
import org.onap.policy.common.utils.jpa.EntityTransCloser;
import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
import org.slf4j.Logger;
@@ -243,14 +240,14 @@ public class IntegrityMonitorTestBase {
}
/**
- * Waits for a latch to reach zero.
+ * Waits for a semaphore to be acquired
*
- * @param latch the latch
+ * @param sem the latch
* @throws InterruptedException if the thread is interrupted
* @throws AssertionError if the latch did not reach zero in the allotted time
*/
- protected void waitLatch(CountDownLatch latch) throws InterruptedException {
- assertTrue(latch.await(WAIT_MS, TimeUnit.SECONDS));
+ protected void waitSem(Semaphore sem) throws InterruptedException {
+ assertTrue(sem.tryAcquire(WAIT_MS, TimeUnit.MILLISECONDS));
}
/**
diff --git a/utils-test/pom.xml b/utils-test/pom.xml
index a7c2eaee..933104b0 100644
--- a/utils-test/pom.xml
+++ b/utils-test/pom.xml
@@ -42,6 +42,11 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.onap.policy.common</groupId>
+ <artifactId>utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java
new file mode 100644
index 00000000..3dfed4bd
--- /dev/null
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Common Utils-Test
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * "Current" time, when running junit tests. This is intended to be injected into classes
+ * under test, to replace their {@link CurrentTime} objects. When {@link #sleep(long)} is
+ * invoked, it simply advances the notion of "current" time and returns immediately.
+ */
+public class TestTime extends CurrentTime {
+
+ /**
+ * "Current" time, in milliseconds, used by tests.
+ */
+ private AtomicLong tcur = new AtomicLong(System.currentTimeMillis());
+
+ /**
+ *
+ */
+ public TestTime() {
+ super();
+ }
+
+ @Override
+ public long getMillis() {
+ return tcur.get();
+ }
+
+ @Override
+ public Date getDate() {
+ return new Date(tcur.get());
+ }
+
+ @Override
+ public void sleep(long sleepMs) throws InterruptedException {
+ tcur.addAndGet(sleepMs);
+ }
+}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java
new file mode 100644
index 00000000..7a8277c7
--- /dev/null
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java
@@ -0,0 +1,200 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Common Utils-Test
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.Date;
+import java.util.PriorityQueue;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * "Current" time, when running junit tests in multiple threads. This is intended to be
+ * injected into classes under test, to replace their {@link CurrentTime} objects. The
+ * {@link #sleep(long)} method blocks until all threads enter and then it moves the notion
+ * of "current" time forward, allowing threads to resume, as the end of their sleep time
+ * is reached. Additional threads do not resume until all threads have once again entered
+ * {@link #sleep(long)} or when {@link #threadCompleted()} is invoked to indicate that a
+ * thread will not re-enter {@link #sleep(long)}.
+ */
+public class TestTimeMulti extends CurrentTime {
+
+ /**
+ * Number of threads that will be sleeping simultaneously.
+ */
+ private int nthreads;
+
+ /**
+ * "Current" time, in milliseconds, used by tests.
+ */
+ private long tcur = System.currentTimeMillis();
+
+ /**
+ * Queue of sleeping threads waiting to be awakened.
+ */
+ private final PriorityQueue<Info> queue = new PriorityQueue<>();
+
+ /**
+ * Used to synchronize updates.
+ */
+ private final Object locker = new Object();
+
+ /**
+ *
+ * @param nthreads number of threads that will be sleeping simultaneously
+ */
+ public TestTimeMulti(int nthreads) {
+ this.nthreads = nthreads;
+ }
+
+ @Override
+ public long getMillis() {
+ return tcur;
+ }
+
+ @Override
+ public Date getDate() {
+ return new Date(tcur);
+ }
+
+ @Override
+ public void sleep(long sleepMs) throws InterruptedException {
+ if (sleepMs <= 0) {
+ return;
+ }
+
+ Info info = new Info(tcur + sleepMs);
+
+ synchronized (locker) {
+ queue.add(info);
+
+ if (queue.size() == nthreads) {
+ // all threads are now sleeping - wake one up
+ wakeThreads();
+ }
+ }
+
+ // this MUST happen outside of the "synchronized" block
+ info.await();
+ }
+
+ /**
+ * Indicates that a thread has terminated or that it will no longer be invoking
+ * {@link #sleep(long)}. Awakens the next sleeping thread, if the queue is full after
+ * removing the terminated thread.
+ *
+ * @throws IllegalStateException if the queue is already full
+ */
+ public void threadCompleted() {
+ synchronized (locker) {
+ int sz = queue.size();
+ if (sz >= nthreads) {
+ throw new IllegalStateException("too many threads still sleeping");
+ }
+
+ --nthreads;
+
+ if (sz == nthreads) {
+ // after removing terminated thread - queue is now full; awaken something
+ wakeThreads();
+ }
+ }
+ }
+
+ /**
+ * Advances the "current" time and awakens any threads sleeping until that time.
+ */
+ private void wakeThreads() {
+ Info info = queue.poll();
+ if(info == null) {
+ return;
+ }
+
+ tcur = info.getAwakenAtMs();
+ info.wake();
+
+ while ((info = queue.poll()) != null) {
+ if (tcur == info.getAwakenAtMs()) {
+ info.wake();
+
+ } else {
+ // not ready to wake this thread - put it back in the queue
+ queue.add(info);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Info about a sleeping thread.
+ */
+ private static class Info implements Comparable<Info> {
+
+ /**
+ * Time, in milliseconds, at which the associated thread should awaken.
+ */
+ private final long awakenAtMs;
+
+ /**
+ * This is triggered when the associated thread should awaken.
+ */
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ /**
+ * @param awakenAtMs time, in milliseconds, at which the associated thread should
+ * awaken
+ */
+ public Info(long awakenAtMs) {
+ this.awakenAtMs = awakenAtMs;
+ }
+
+ public long getAwakenAtMs() {
+ return awakenAtMs;
+ }
+
+ /**
+ * Awakens the associated thread by decrementing its latch.
+ */
+ public void wake() {
+ latch.countDown();
+ }
+
+ /**
+ * Blocks the current thread until awakened (i.e., until its latch is
+ * decremented).
+ *
+ * @throws InterruptedException
+ */
+ public void await() throws InterruptedException {
+ latch.await();
+ }
+
+ @Override
+ public int compareTo(Info o) {
+ int diff = Long.compare(awakenAtMs, o.awakenAtMs);
+
+ // this assumes that Object.toString() is unique for each Info object
+ if (diff == 0)
+ diff = this.toString().compareTo(o.toString());
+
+ return diff;
+ }
+
+ }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java
new file mode 100644
index 00000000..206ab5f1
--- /dev/null
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java
@@ -0,0 +1,116 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Common Utils-Test
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TestTimeMultiTest {
+
+ private static final int NTHREADS = 10;
+ private static final int NTIMES = 100;
+ private static final long WAIT_SEC = 5L;
+ private static final long MIN_SLEEP_MS = 5L;
+
+ private TestTimeMulti ttm;
+ private Semaphore done;
+
+ @Test
+ public void test() throws Exception {
+ ttm = new TestTimeMulti(NTHREADS);
+ done = new Semaphore(0);
+
+ long tbeg = ttm.getMillis();
+
+ // create threads
+ List<MyThread> threads = new ArrayList<>(NTHREADS);
+ for (int x = 0; x < NTHREADS; ++x) {
+ threads.add(new MyThread(x + MIN_SLEEP_MS));
+ }
+
+ // launch threads
+ for (MyThread thr : threads) {
+ thr.start();
+ }
+
+ // wait for each one to complete
+ for (MyThread thr : threads) {
+ assertTrue("complete " + thr.getSleepMs(), done.tryAcquire(WAIT_SEC, TimeUnit.SECONDS));
+ ttm.threadCompleted();
+ }
+
+ // check results
+ for (MyThread thr : threads) {
+ assertEquals("time " + thr.getSleepMs(), thr.texpected, thr.tactual);
+ }
+
+ assertTrue(ttm.getMillis() >= tbeg + NTIMES * MIN_SLEEP_MS);
+ }
+
+ private class MyThread extends Thread {
+
+ private final long sleepMs;
+
+ private volatile long texpected;
+ private volatile long tactual;
+
+ public MyThread(long sleepMs) {
+ this.sleepMs = sleepMs;
+
+ this.setDaemon(true);
+ }
+
+ public long getSleepMs() {
+ return sleepMs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int x = 0; x < NTIMES; ++x) {
+ texpected = ttm.getMillis() + sleepMs;
+ ttm.sleep(sleepMs);
+
+ if ((tactual = ttm.getMillis()) != texpected) {
+ break;
+ }
+
+ if ((tactual = ttm.getDate().getTime()) != texpected) {
+ break;
+ }
+ }
+
+ } catch (InterruptedException expected) {
+ Thread.currentThread().interrupt();
+ }
+
+ done.release();
+ }
+ }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java
new file mode 100644
index 00000000..c1e15b38
--- /dev/null
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Common Utils-Test
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+public class TestTimeTest {
+
+ @Test
+ public void test() throws Exception {
+ TestTime tm = new TestTime();
+ TestTime tm2 = new TestTime();
+
+ long treal = System.currentTimeMillis();
+
+ long tcur = tm.getMillis();
+ assertEquals(tcur, tm.getDate().getTime());
+
+ long tsleep = 10000L;
+ long tcur2 = tcur;
+
+ // sleep a bit and then check values
+ tcur2 += tsleep;
+ tm2.sleep(tsleep);
+ assertEquals(tcur2, tm2.getMillis());
+ assertEquals(tcur2, tm2.getDate().getTime());
+
+ // sleep some more and then check values
+ tcur2 += tsleep;
+ tm2.sleep(tsleep);
+ assertEquals(tcur2, tm2.getMillis());
+ assertEquals(tcur2, tm2.getDate().getTime());
+
+ // check again - to ensure unchanged
+ assertEquals(tcur2, tm2.getMillis());
+ assertEquals(tcur2, tm2.getDate().getTime());
+
+ // original should also be unchanged
+ assertEquals(tcur, tm.getMillis());
+ assertEquals(tcur, tm.getDate().getTime());
+
+ // ensure that no real time has elapsed
+ assertTrue(System.currentTimeMillis() < treal + tsleep / 2);
+ }
+
+}
diff --git a/utils/src/main/java/org/onap/policy/common/utils/time/CurrentTime.java b/utils/src/main/java/org/onap/policy/common/utils/time/CurrentTime.java
new file mode 100644
index 00000000..cab469e5
--- /dev/null
+++ b/utils/src/main/java/org/onap/policy/common/utils/time/CurrentTime.java
@@ -0,0 +1,61 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Common Utils
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.Date;
+
+/**
+ * Methods to access the current time. Classes can use objects of this type to get current
+ * time information, while allowing the objects to be overridden by junit tests.
+ */
+public class CurrentTime {
+
+ /**
+ *
+ */
+ public CurrentTime() {
+ super();
+ }
+
+ /**
+ * @return the current time, in milliseconds
+ */
+ public long getMillis() {
+ return System.currentTimeMillis();
+ }
+
+ /**
+ * @return the current Date
+ */
+ public Date getDate() {
+ return new Date();
+ }
+
+ /**
+ * Sleeps for a period of time.
+ *
+ * @param sleepMs amount of time to sleep, in milliseconds
+ * @throws InterruptedException
+ */
+ public void sleep(long sleepMs) throws InterruptedException {
+ Thread.sleep(sleepMs);
+ }
+}
diff --git a/utils/src/test/java/org/onap/policy/common/utils/time/CurrentTimeTest.java b/utils/src/test/java/org/onap/policy/common/utils/time/CurrentTimeTest.java
new file mode 100644
index 00000000..694a3d21
--- /dev/null
+++ b/utils/src/test/java/org/onap/policy/common/utils/time/CurrentTimeTest.java
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Common Utils
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+public class CurrentTimeTest {
+
+ @Test
+ public void testGetMillis() {
+ long tcur = System.currentTimeMillis();
+ long tval = new CurrentTime().getMillis();
+ long tval2 = new CurrentTime().getMillis();
+ long tend = System.currentTimeMillis();
+
+ assertTrue(tval >= tcur && tval <= tend);
+ assertTrue(tval2 >= tcur && tval2 <= tend);
+ }
+
+ @Test
+ public void testGetDate() {
+ long tcur = System.currentTimeMillis();
+ long tval = new CurrentTime().getDate().getTime();
+ long tval2 = new CurrentTime().getDate().getTime();
+ long tend = System.currentTimeMillis();
+
+ assertTrue(tval >= tcur && tval <= tend);
+ assertTrue(tval2 >= tcur && tval2 <= tend);
+ }
+
+ @Test
+ public void testSleep() throws Exception {
+ long tcur = System.currentTimeMillis();
+ new CurrentTime().sleep(10);
+ long tend = System.currentTimeMillis();
+
+ assertTrue(tend >= tcur + 10 - 1);
+ }
+
+}