aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java100
-rw-r--r--integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java47
-rw-r--r--integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java13
3 files changed, 93 insertions, 67 deletions
diff --git a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java
index c32a2213..38dc20dc 100644
--- a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java
+++ b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java
@@ -29,10 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-
import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.persistence.EntityManager;
@@ -43,7 +40,6 @@ import javax.persistence.LockModeType;
import javax.persistence.Persistence;
import javax.persistence.Query;
import javax.validation.constraints.NotNull;
-
import org.onap.policy.common.im.jmx.ComponentAdmin;
import org.onap.policy.common.im.jmx.ComponentAdminMBean;
import org.onap.policy.common.im.jmx.JmxAgentConnection;
@@ -196,7 +192,7 @@ public class IntegrityMonitor {
*/
protected IntegrityMonitor(String resourceName, Properties properties) throws IntegrityMonitorException {
- this(resourceName, properties, null);
+ this(resourceName, properties, new Factory());
}
/**
@@ -207,10 +203,10 @@ public class IntegrityMonitor {
*
* @param resourceName The resource name of the resource
* @param properties a set of properties passed in from the resource
- * @param queue queue to use to control the FPManager thread, or {@code null}
+ * @param factory Factory to use to control the FPManager thread
* @throws IntegrityMonitorException if any errors are encountered in the constructor
*/
- protected IntegrityMonitor(String resourceName, Properties properties, BlockingQueue<CountDownLatch> queue)
+ protected IntegrityMonitor(String resourceName, Properties properties, Factory factory)
throws IntegrityMonitorException {
// singleton check since this constructor can be called from a child or
@@ -357,7 +353,8 @@ public class IntegrityMonitor {
logger.error("ComponentAdmin constructor exception: {}", e.toString(), e);
}
- fpManager = new FpManager(queue);
+ fpManager = new FpManager(factory);
+ fpManager.start();
}
@@ -373,7 +370,7 @@ public class IntegrityMonitor {
*/
public static IntegrityMonitor getInstance(String resourceName, Properties properties)
throws IntegrityMonitorException {
- return getInstance(resourceName, properties, null);
+ return getInstance(resourceName, properties, new Factory());
}
/**
@@ -382,13 +379,13 @@ public class IntegrityMonitor {
*
* @param resourceName The resource name of the resource
* @param properties a set of properties passed in from the resource
- * @param queue queue to use to control the FPManager thread, or {@code null}
+ * @param factory Factory to use to control the FPManager thread
* @return The new instance of IntegrityMonitor
* @throws IntegrityMonitorException if unable to create jmx url or the constructor returns an
* exception
*/
protected static IntegrityMonitor getInstance(String resourceName, Properties properties,
- BlockingQueue<CountDownLatch> queue) throws IntegrityMonitorException {
+ Factory factory) throws IntegrityMonitorException {
synchronized (getInstanceLock) {
logger.debug("getInstance() called - resourceName= {}", resourceName);
@@ -399,7 +396,7 @@ public class IntegrityMonitor {
if (instance == null) {
logger.debug("Creating new instance of IntegrityMonitor");
- instance = new IntegrityMonitor(resourceName, properties, queue);
+ instance = new IntegrityMonitor(resourceName, properties, factory);
}
return instance;
}
@@ -1740,18 +1737,15 @@ public class IntegrityMonitor {
* dependencies, does a refresh state audit and runs the stateAudit.
*/
class FpManager extends Thread {
- private final CountDownLatch stopper = new CountDownLatch(1);
+ private boolean stopRequested = false;
- private BlockingQueue<CountDownLatch> queue;
- private CountDownLatch progressLatch = null;
+ private final Factory factory;
// Constructor - start FP manager thread
- FpManager(BlockingQueue<CountDownLatch> queue) {
- this.queue = queue;
+ FpManager(Factory factory) {
+ this.factory = factory;
// set now as the last time the refreshStateAudit ran
IntegrityMonitor.this.refreshStateAuditLastRunDate = new Date();
- // start thread
- this.start();
}
@Override
@@ -1759,13 +1753,13 @@ public class IntegrityMonitor {
logger.debug("FPManager thread running");
try {
- getLatch();
- decrementLatch();
+ factory.runStarted();
- while (!stopper.await(cycleIntervalMillis, TimeUnit.MILLISECONDS)) {
- getLatch();
+ while(!stopRequested) {
+ factory.doSleep(cycleIntervalMillis);
+
IntegrityMonitor.this.runOnce();
- decrementLatch();
+ factory.monitorCompleted();
}
} catch (InterruptedException e) {
@@ -1775,31 +1769,9 @@ public class IntegrityMonitor {
}
public void stopAndExit() {
- stopper.countDown();
+ stopRequested = true;
this.interrupt();
}
-
- /**
- * Gets the next latch from the queue.
- *
- * @throws InterruptedException
- *
- */
- private void getLatch() throws InterruptedException {
- if (queue != null) {
- progressLatch = queue.take();
- }
- }
-
- /**
- * Decrements the current latch.
- */
- private void decrementLatch() {
- if (progressLatch != null) {
- progressLatch.countDown();
- }
- }
-
}
private void runOnce() {
@@ -1934,6 +1906,40 @@ public class IntegrityMonitor {
return allNotWellMap;
}
+ /**
+ * Used to access various objects. Overridden by junit tests.
+ */
+ public static class Factory {
+
+ /**
+ * Indicates that the {@link FpManager#run()} method has started. This method
+ * simply returns.
+ *
+ * @throws InterruptedException
+ */
+ public void runStarted() throws InterruptedException {
+ // does nothing
+ }
+
+ /**
+ * Sleeps for a period of time.
+ * @param sleepMs amount of time to sleep, in milliseconds
+ * @throws InterruptedException
+ */
+ public void doSleep(long sleepMs) throws InterruptedException {
+ Thread.sleep(sleepMs);
+ }
+
+ /**
+ * Indicates that a monitor activity has completed. This method simply returns.
+ *
+ * @throws InterruptedException
+ */
+ public void monitorCompleted() throws InterruptedException {
+ // does nothing
+ }
+ }
+
/*
* The remaining methods are used by JUnit tests.
*/
diff --git a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java
index 7f1e5516..091dcc91 100644
--- a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java
+++ b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java
@@ -22,23 +22,19 @@ package org.onap.policy.common.im;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-
import java.util.Date;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import java.util.concurrent.Semaphore;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
import javax.persistence.TemporalType;
-
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.onap.policy.common.im.IntegrityMonitor.Factory;
import org.onap.policy.common.im.jpa.ForwardProgressEntity;
import org.onap.policy.common.im.jpa.ResourceRegistrationEntity;
import org.onap.policy.common.im.jpa.StateManagementEntity;
@@ -57,7 +53,8 @@ public class IntegrityMonitorTest extends IntegrityMonitorTestBase {
private static EntityTransaction et;
private static String resourceName;
- private BlockingQueue<CountDownLatch> queue;
+ private Semaphore monitorSem;
+ private Semaphore junitSem;
/**
* Set up for test class.
@@ -900,9 +897,36 @@ public class IntegrityMonitorTest extends IntegrityMonitorTestBase {
private IntegrityMonitor makeMonitor(String resourceName, Properties myProp) throws Exception {
IntegrityMonitor.deleteInstance();
- queue = new LinkedBlockingQueue<>();
+ monitorSem = new Semaphore(0);
+ junitSem = new Semaphore(0);
+
+ Factory factory = new IntegrityMonitor.Factory() {
+
+ @Override
+ public void doSleep(long sleepMs) throws InterruptedException {
+ /*
+ * No need to sleep, as the thread won't progress until the
+ * semaphore is released.
+ */
+ }
+
+ @Override
+ public void runStarted() throws InterruptedException {
+ monitorSem.acquire();
+
+ junitSem.release();
+ monitorSem.acquire();
+ }
+
+ @Override
+ public void monitorCompleted() throws InterruptedException {
+ junitSem.release();
+ monitorSem.acquire();
+ }
+
+ };
- IntegrityMonitor im = IntegrityMonitor.getInstance(resourceName, myProp, queue);
+ IntegrityMonitor im = IntegrityMonitor.getInstance(resourceName, myProp, factory);
// wait for the monitor thread to start
waitStep();
@@ -916,8 +940,7 @@ public class IntegrityMonitorTest extends IntegrityMonitorTestBase {
* @throws InterruptedException if the thread is interrupted
*/
private void waitStep() throws InterruptedException {
- CountDownLatch latch = new CountDownLatch(1);
- queue.offer(latch);
- waitLatch(latch);
+ monitorSem.release();
+ waitSem(junitSem);
}
}
diff --git a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java
index 0c8259b7..e5562306 100644
--- a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java
+++ b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java
@@ -22,17 +22,14 @@ package org.onap.policy.common.im;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
-
import org.onap.policy.common.utils.jpa.EntityTransCloser;
import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
import org.slf4j.Logger;
@@ -243,14 +240,14 @@ public class IntegrityMonitorTestBase {
}
/**
- * Waits for a latch to reach zero.
+ * Waits for a semaphore to be acquired
*
- * @param latch the latch
+ * @param sem the latch
* @throws InterruptedException if the thread is interrupted
* @throws AssertionError if the latch did not reach zero in the allotted time
*/
- protected void waitLatch(CountDownLatch latch) throws InterruptedException {
- assertTrue(latch.await(WAIT_MS, TimeUnit.SECONDS));
+ protected void waitSem(Semaphore sem) throws InterruptedException {
+ assertTrue(sem.tryAcquire(WAIT_MS, TimeUnit.MILLISECONDS));
}
/**