diff options
3 files changed, 93 insertions, 67 deletions
diff --git a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java index c32a2213..38dc20dc 100644 --- a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java +++ b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java @@ -29,10 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - import javax.management.JMX; import javax.management.MBeanServerConnection; import javax.persistence.EntityManager; @@ -43,7 +40,6 @@ import javax.persistence.LockModeType; import javax.persistence.Persistence; import javax.persistence.Query; import javax.validation.constraints.NotNull; - import org.onap.policy.common.im.jmx.ComponentAdmin; import org.onap.policy.common.im.jmx.ComponentAdminMBean; import org.onap.policy.common.im.jmx.JmxAgentConnection; @@ -196,7 +192,7 @@ public class IntegrityMonitor { */ protected IntegrityMonitor(String resourceName, Properties properties) throws IntegrityMonitorException { - this(resourceName, properties, null); + this(resourceName, properties, new Factory()); } /** @@ -207,10 +203,10 @@ public class IntegrityMonitor { * * @param resourceName The resource name of the resource * @param properties a set of properties passed in from the resource - * @param queue queue to use to control the FPManager thread, or {@code null} + * @param factory Factory to use to control the FPManager thread * @throws IntegrityMonitorException if any errors are encountered in the constructor */ - protected IntegrityMonitor(String resourceName, Properties properties, BlockingQueue<CountDownLatch> queue) + protected IntegrityMonitor(String resourceName, Properties properties, Factory factory) throws IntegrityMonitorException { // singleton check since this constructor can be called from a child or @@ -357,7 +353,8 @@ public class IntegrityMonitor { logger.error("ComponentAdmin constructor exception: {}", e.toString(), e); } - fpManager = new FpManager(queue); + fpManager = new FpManager(factory); + fpManager.start(); } @@ -373,7 +370,7 @@ public class IntegrityMonitor { */ public static IntegrityMonitor getInstance(String resourceName, Properties properties) throws IntegrityMonitorException { - return getInstance(resourceName, properties, null); + return getInstance(resourceName, properties, new Factory()); } /** @@ -382,13 +379,13 @@ public class IntegrityMonitor { * * @param resourceName The resource name of the resource * @param properties a set of properties passed in from the resource - * @param queue queue to use to control the FPManager thread, or {@code null} + * @param factory Factory to use to control the FPManager thread * @return The new instance of IntegrityMonitor * @throws IntegrityMonitorException if unable to create jmx url or the constructor returns an * exception */ protected static IntegrityMonitor getInstance(String resourceName, Properties properties, - BlockingQueue<CountDownLatch> queue) throws IntegrityMonitorException { + Factory factory) throws IntegrityMonitorException { synchronized (getInstanceLock) { logger.debug("getInstance() called - resourceName= {}", resourceName); @@ -399,7 +396,7 @@ public class IntegrityMonitor { if (instance == null) { logger.debug("Creating new instance of IntegrityMonitor"); - instance = new IntegrityMonitor(resourceName, properties, queue); + instance = new IntegrityMonitor(resourceName, properties, factory); } return instance; } @@ -1740,18 +1737,15 @@ public class IntegrityMonitor { * dependencies, does a refresh state audit and runs the stateAudit. */ class FpManager extends Thread { - private final CountDownLatch stopper = new CountDownLatch(1); + private boolean stopRequested = false; - private BlockingQueue<CountDownLatch> queue; - private CountDownLatch progressLatch = null; + private final Factory factory; // Constructor - start FP manager thread - FpManager(BlockingQueue<CountDownLatch> queue) { - this.queue = queue; + FpManager(Factory factory) { + this.factory = factory; // set now as the last time the refreshStateAudit ran IntegrityMonitor.this.refreshStateAuditLastRunDate = new Date(); - // start thread - this.start(); } @Override @@ -1759,13 +1753,13 @@ public class IntegrityMonitor { logger.debug("FPManager thread running"); try { - getLatch(); - decrementLatch(); + factory.runStarted(); - while (!stopper.await(cycleIntervalMillis, TimeUnit.MILLISECONDS)) { - getLatch(); + while(!stopRequested) { + factory.doSleep(cycleIntervalMillis); + IntegrityMonitor.this.runOnce(); - decrementLatch(); + factory.monitorCompleted(); } } catch (InterruptedException e) { @@ -1775,31 +1769,9 @@ public class IntegrityMonitor { } public void stopAndExit() { - stopper.countDown(); + stopRequested = true; this.interrupt(); } - - /** - * Gets the next latch from the queue. - * - * @throws InterruptedException - * - */ - private void getLatch() throws InterruptedException { - if (queue != null) { - progressLatch = queue.take(); - } - } - - /** - * Decrements the current latch. - */ - private void decrementLatch() { - if (progressLatch != null) { - progressLatch.countDown(); - } - } - } private void runOnce() { @@ -1934,6 +1906,40 @@ public class IntegrityMonitor { return allNotWellMap; } + /** + * Used to access various objects. Overridden by junit tests. + */ + public static class Factory { + + /** + * Indicates that the {@link FpManager#run()} method has started. This method + * simply returns. + * + * @throws InterruptedException + */ + public void runStarted() throws InterruptedException { + // does nothing + } + + /** + * Sleeps for a period of time. + * @param sleepMs amount of time to sleep, in milliseconds + * @throws InterruptedException + */ + public void doSleep(long sleepMs) throws InterruptedException { + Thread.sleep(sleepMs); + } + + /** + * Indicates that a monitor activity has completed. This method simply returns. + * + * @throws InterruptedException + */ + public void monitorCompleted() throws InterruptedException { + // does nothing + } + } + /* * The remaining methods are used by JUnit tests. */ diff --git a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java index 7f1e5516..091dcc91 100644 --- a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java +++ b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java @@ -22,23 +22,19 @@ package org.onap.policy.common.im; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; - import java.util.Date; import java.util.List; import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; - +import java.util.concurrent.Semaphore; import javax.persistence.EntityTransaction; import javax.persistence.Query; import javax.persistence.TemporalType; - import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.onap.policy.common.im.IntegrityMonitor.Factory; import org.onap.policy.common.im.jpa.ForwardProgressEntity; import org.onap.policy.common.im.jpa.ResourceRegistrationEntity; import org.onap.policy.common.im.jpa.StateManagementEntity; @@ -57,7 +53,8 @@ public class IntegrityMonitorTest extends IntegrityMonitorTestBase { private static EntityTransaction et; private static String resourceName; - private BlockingQueue<CountDownLatch> queue; + private Semaphore monitorSem; + private Semaphore junitSem; /** * Set up for test class. @@ -900,9 +897,36 @@ public class IntegrityMonitorTest extends IntegrityMonitorTestBase { private IntegrityMonitor makeMonitor(String resourceName, Properties myProp) throws Exception { IntegrityMonitor.deleteInstance(); - queue = new LinkedBlockingQueue<>(); + monitorSem = new Semaphore(0); + junitSem = new Semaphore(0); + + Factory factory = new IntegrityMonitor.Factory() { + + @Override + public void doSleep(long sleepMs) throws InterruptedException { + /* + * No need to sleep, as the thread won't progress until the + * semaphore is released. + */ + } + + @Override + public void runStarted() throws InterruptedException { + monitorSem.acquire(); + + junitSem.release(); + monitorSem.acquire(); + } + + @Override + public void monitorCompleted() throws InterruptedException { + junitSem.release(); + monitorSem.acquire(); + } + + }; - IntegrityMonitor im = IntegrityMonitor.getInstance(resourceName, myProp, queue); + IntegrityMonitor im = IntegrityMonitor.getInstance(resourceName, myProp, factory); // wait for the monitor thread to start waitStep(); @@ -916,8 +940,7 @@ public class IntegrityMonitorTest extends IntegrityMonitorTestBase { * @throws InterruptedException if the thread is interrupted */ private void waitStep() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - queue.offer(latch); - waitLatch(latch); + monitorSem.release(); + waitSem(junitSem); } } diff --git a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java index 0c8259b7..e5562306 100644 --- a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java +++ b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java @@ -22,17 +22,14 @@ package org.onap.policy.common.im; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import java.io.FileOutputStream; import java.io.IOException; import java.util.Properties; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; - import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.Persistence; - import org.onap.policy.common.utils.jpa.EntityTransCloser; import org.onap.policy.common.utils.test.log.logback.ExtractAppender; import org.slf4j.Logger; @@ -243,14 +240,14 @@ public class IntegrityMonitorTestBase { } /** - * Waits for a latch to reach zero. + * Waits for a semaphore to be acquired * - * @param latch the latch + * @param sem the latch * @throws InterruptedException if the thread is interrupted * @throws AssertionError if the latch did not reach zero in the allotted time */ - protected void waitLatch(CountDownLatch latch) throws InterruptedException { - assertTrue(latch.await(WAIT_MS, TimeUnit.SECONDS)); + protected void waitSem(Semaphore sem) throws InterruptedException { + assertTrue(sem.tryAcquire(WAIT_MS, TimeUnit.MILLISECONDS)); } /** |