aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java80
-rw-r--r--integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java49
-rw-r--r--integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java103
3 files changed, 115 insertions, 117 deletions
diff --git a/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java b/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
index f1839b12..58ed8b99 100644
--- a/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
+++ b/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
@@ -25,10 +25,7 @@ import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-
import org.onap.policy.common.ia.jpa.IntegrityAuditEntity;
import org.onap.policy.common.logging.eelf.MessageCodes;
import org.onap.policy.common.logging.flexlogger.FlexLogger;
@@ -108,18 +105,6 @@ public class AuditThread extends Thread {
private IntegrityAudit integrityAudit;
/**
- * A latch is taken from this queue before starting an audit. May be {@code null}. Used by JUnit
- * tests.
- */
- private BlockingQueue<CountDownLatch> auditLatchQueue;
-
- /**
- * Latch to be decremented when the next audit completes. May be {@code null}. Used by JUnit
- * tests to wait for an audit to complete.
- */
- private CountDownLatch auditCompletionLatch = null;
-
- /**
* AuditThread constructor.
*
* @param resourceName the resource name
@@ -133,7 +118,7 @@ public class AuditThread extends Thread {
int integrityAuditPeriodSeconds, IntegrityAudit integrityAudit) throws IntegrityAuditException {
this(resourceName, persistenceUnit, properties, TimeUnit.SECONDS.toMillis(integrityAuditPeriodSeconds),
- integrityAudit, null);
+ integrityAudit);
}
/**
@@ -148,13 +133,12 @@ public class AuditThread extends Thread {
* @throws IntegrityAuditException if an error occurs
*/
public AuditThread(String resourceName, String persistenceUnit, Properties properties, long integrityAuditMillis,
- IntegrityAudit integrityAudit, BlockingQueue<CountDownLatch> queue) throws IntegrityAuditException {
+ IntegrityAudit integrityAudit) throws IntegrityAuditException {
this.resourceName = resourceName;
this.persistenceUnit = persistenceUnit;
this.properties = properties;
this.integrityAuditPeriodMillis = integrityAuditMillis;
this.integrityAudit = integrityAudit;
- this.auditLatchQueue = queue;
/*
* The DbDAO Constructor registers this node in the IntegrityAuditEntity table. Each
@@ -174,14 +158,8 @@ public class AuditThread extends Thread {
logger.info("AuditThread.run: Entering");
try {
- /*
- * For JUnit testing: wait for the first latch, decrement it to indicate that the thread
- * has started, and then wait for the next latch, before we actually start doing
- * anything. These simply return if there is no latch queue defined.
- */
- getNextLatch();
- decrementLatch();
- getNextLatch();
+ // for junit testing
+ runStarted();
/*
* Triggers change in designation, unless no other viable candidate.
@@ -284,11 +262,8 @@ public class AuditThread extends Thread {
* property, otherwise just sleep the normal interval.
*/
if (auditCompleted) {
- // indicate that an audit has completed
- decrementLatch();
-
- // don't start the next audit cycle until a latch has been provided
- getNextLatch();
+ // for junit testing: indicate that an audit has completed
+ auditCompleted();
if (logger.isDebugEnabled()) {
logger.debug("AuditThread.run: Audit completed; resourceName=" + this.resourceName
@@ -342,29 +317,6 @@ public class AuditThread extends Thread {
}
/**
- * Gets the next audit-completion latch from the queue. Blocks, if the queue is empty.
- *
- * @throws InterruptedException if interrupted while waiting
- */
- private void getNextLatch() throws InterruptedException {
- BlockingQueue<CountDownLatch> queue = this.auditLatchQueue;
- if (queue != null) {
- this.auditCompletionLatch = queue.take();
- }
- }
-
- /**
- * Decrements the current audit-completion latch, if any.
- */
- private void decrementLatch() {
- CountDownLatch latch = this.auditCompletionLatch;
- if (latch != null) {
- this.auditCompletionLatch = null;
- latch.countDown();
- }
- }
-
- /**
* Determines if an exception is an InterruptedException or was caused by an
* InterruptedException.
*
@@ -788,6 +740,26 @@ public class AuditThread extends Thread {
}
/**
+ * Indicates that the {@link #run()} method has started. This method simply returns,
+ * and may overridden by junit tests.
+ *
+ * @throws InterruptedException
+ */
+ public void runStarted() throws InterruptedException {
+ // does nothing
+ }
+
+ /**
+ * Indicates that an audit has completed. This method simply returns, and may
+ * overridden by junit tests.
+ *
+ * @throws InterruptedException
+ */
+ public void auditCompleted() throws InterruptedException {
+ // does nothing
+ }
+
+ /**
* Adjusts the thread-sleep-interval to be used when an audit has <i>not</i> been completed.
* Used by JUnit tests.
*
diff --git a/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java b/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java
index 9abdbe52..b3330faf 100644
--- a/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java
+++ b/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java
@@ -21,9 +21,6 @@
package org.onap.policy.common.ia;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-
import org.onap.policy.common.ia.IntegrityAuditProperties.NodeTypeEnum;
import org.onap.policy.common.logging.flexlogger.FlexLogger;
import org.onap.policy.common.logging.flexlogger.Logger;
@@ -217,37 +214,20 @@ public class IntegrityAudit {
* @throws IntegrityAuditException if an error occurs
*/
public void startAuditThread() throws IntegrityAuditException {
- startAuditThread(null);
- }
-
- /**
- * Starts the audit thread.
- *
- * @param queue the queue
- * @return {@code true} if the thread was started, {@code false} otherwise
- * @throws IntegrityAuditException if an error occurs
- */
- protected boolean startAuditThread(BlockingQueue<CountDownLatch> queue) throws IntegrityAuditException {
-
logger.info("startAuditThread: Entering");
- boolean success = false;
-
if (integrityAuditPeriodMillis >= 0) {
- this.auditThread = new AuditThread(this.resourceName, this.persistenceUnit, this.properties,
- integrityAuditPeriodMillis, this, queue);
+ this.auditThread = makeAuditThread(this.resourceName, this.persistenceUnit, this.properties, integrityAuditPeriodMillis);
logger.info("startAuditThread: Audit started and will run every " + integrityAuditPeriodMillis / 1000
+ " seconds");
this.auditThread.start();
- success = true;
+
} else {
logger.info("startAuditThread: Suppressing integrity audit, integrityAuditPeriodSeconds="
+ integrityAuditPeriodMillis / 1000);
}
logger.info("startAuditThread: Exiting");
-
- return success;
}
/**
@@ -299,4 +279,29 @@ public class IntegrityAudit {
return !this.auditThread.isAlive();
}
}
+
+ /**
+ *
+ * @return {@code true} if an audit thread exists, {@code false} otherwise
+ */
+ protected boolean haveAuditThread() {
+ return (this.auditThread != null);
+ }
+
+ /**
+ * Creates an audit thread. May be overridden by junit tests.
+ *
+ * @param resourceName2
+ * @param persistenceUnit2
+ * @param properties2
+ * @param integrityAuditPeriodMillis2
+ *
+ * @return a new audit thread
+ * @throws IntegrityAuditException
+ */
+ protected AuditThread makeAuditThread(String resourceName2, String persistenceUnit2, Properties properties2,
+ long integrityAuditPeriodMillis2) throws IntegrityAuditException {
+
+ return new AuditThread(resourceName2, persistenceUnit2, properties2, integrityAuditPeriodMillis2, this);
+ }
}
diff --git a/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java b/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java
index afbcc452..c9179908 100644
--- a/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java
+++ b/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java
@@ -22,10 +22,6 @@ package org.onap.policy.common.ia;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.Logger;
-
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
@@ -34,21 +30,19 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.EntityTransaction;
import javax.persistence.Persistence;
-
import org.onap.policy.common.utils.jpa.EntityMgrCloser;
import org.onap.policy.common.utils.jpa.EntityMgrFactoryCloser;
import org.onap.policy.common.utils.jpa.EntityTransCloser;
import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
import org.slf4j.LoggerFactory;
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
/**
* All JUnits are designed to run in the local development environment where they have write
@@ -464,26 +458,26 @@ public class IntegrityAuditTestBase {
protected void runAudit(MyIntegrityAudit... auditors) throws InterruptedException {
// start an audit cycle on each auditor
- List<CountDownLatch> latches = new ArrayList<>(auditors.length);
+ List<Semaphore> semaphores = new ArrayList<>(auditors.length);
for (MyIntegrityAudit p : auditors) {
- latches.add(p.startAudit());
+ semaphores.add(p.startAudit());
}
// wait for each auditor to complete its cycle
- for (CountDownLatch latch : latches) {
- waitLatch(latch);
+ for (Semaphore sem : semaphores) {
+ waitSem(sem);
}
}
/**
- * Waits for a latch to reach zero.
+ * Waits for a semaphore to be released.
*
- * @param latch the latch to wait for
+ * @param sem the semaphore for which to wait
* @throws InterruptedException if the thread is interrupted
* @throws AssertionError if the latch did not reach zero in the allotted time
*/
- protected void waitLatch(CountDownLatch latch) throws InterruptedException {
- assertTrue(latch.await(WAIT_MS, TimeUnit.SECONDS));
+ protected void waitSem(Semaphore sem) throws InterruptedException {
+ assertTrue(sem.tryAcquire(WAIT_MS, TimeUnit.SECONDS));
}
/**
@@ -520,11 +514,16 @@ public class IntegrityAuditTestBase {
* Manages audits by inserting latches into a queue for the AuditThread to count.
*/
protected class MyIntegrityAudit extends IntegrityAudit {
-
+
+ /**
+ * Semaphore on which the audit thread should wait.
+ */
+ private Semaphore auditSem = null;
+
/**
- * Queue from which the AuditThread will take latches.
+ * Semaphore on which the junit management thread should wait.
*/
- private BlockingQueue<CountDownLatch> queue = null;
+ private Semaphore junitSem = null;
/**
* Constructs an auditor and starts the AuditThread.
@@ -550,16 +549,14 @@ public class IntegrityAuditTestBase {
}
/**
- * Triggers an audit by adding a latch to the queue.
+ * Triggers an audit by releasing the audit thread's semaphore.
*
- * @return the latch that was added
+ * @return the semaphore on which to wait
* @throws InterruptedException if the thread is interrupted
*/
- public CountDownLatch startAudit() throws InterruptedException {
- CountDownLatch latch = new CountDownLatch(1);
- queue.add(latch);
-
- return latch;
+ public Semaphore startAudit() throws InterruptedException {
+ auditSem.release();
+ return junitSem;
}
/**
@@ -567,25 +564,23 @@ public class IntegrityAuditTestBase {
*/
@Override
public final void startAuditThread() throws IntegrityAuditException {
- if (queue != null) {
- // queue up a bogus latch, in case a thread is still running
- queue.add(new CountDownLatch(1) {
- @Override
- public void countDown() {
- throw new RuntimeException("auditor has multiple threads");
- }
- });
+ if (auditSem != null) {
+ // release a bunch of semaphores, in case a thread is still running
+ auditSem.release(1000);
}
+
+ auditSem = new Semaphore(0);
+ junitSem = new Semaphore(0);
+
+ super.startAuditThread();
- queue = new LinkedBlockingQueue<>();
+ if (haveAuditThread()) {
+ // tell the thread it can run
+ auditSem.release();
- if (super.startAuditThread(queue)) {
// wait for the thread to start
- CountDownLatch latch = new CountDownLatch(1);
- queue.add(latch);
-
try {
- waitLatch(latch);
+ waitSem(junitSem);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -605,5 +600,31 @@ public class IntegrityAuditTestBase {
assertTrue(waitThread(this));
}
+
+ @Override
+ protected AuditThread makeAuditThread(String resourceName2, String persistenceUnit2, Properties properties2,
+ long integrityAuditPeriodMillis2) throws IntegrityAuditException {
+
+ return new AuditThread(resourceName2, persistenceUnit2, properties2, integrityAuditPeriodMillis2, this) {
+
+ private Semaphore auditSem = MyIntegrityAudit.this.auditSem;
+ private Semaphore junitSem = MyIntegrityAudit.this.junitSem;
+
+ @Override
+ public void runStarted() throws InterruptedException {
+ auditSem.acquire();
+
+ junitSem.release();
+ auditSem.acquire();
+ }
+
+ @Override
+ public void auditCompleted() throws InterruptedException {
+ junitSem.release();
+ auditSem.acquire();
+ }
+
+ };
+ }
}
}