diff options
3 files changed, 115 insertions, 117 deletions
diff --git a/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java b/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java index f1839b12..58ed8b99 100644 --- a/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java +++ b/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java @@ -25,10 +25,7 @@ import java.util.Comparator; import java.util.Date; import java.util.List; import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - import org.onap.policy.common.ia.jpa.IntegrityAuditEntity; import org.onap.policy.common.logging.eelf.MessageCodes; import org.onap.policy.common.logging.flexlogger.FlexLogger; @@ -108,18 +105,6 @@ public class AuditThread extends Thread { private IntegrityAudit integrityAudit; /** - * A latch is taken from this queue before starting an audit. May be {@code null}. Used by JUnit - * tests. - */ - private BlockingQueue<CountDownLatch> auditLatchQueue; - - /** - * Latch to be decremented when the next audit completes. May be {@code null}. Used by JUnit - * tests to wait for an audit to complete. - */ - private CountDownLatch auditCompletionLatch = null; - - /** * AuditThread constructor. * * @param resourceName the resource name @@ -133,7 +118,7 @@ public class AuditThread extends Thread { int integrityAuditPeriodSeconds, IntegrityAudit integrityAudit) throws IntegrityAuditException { this(resourceName, persistenceUnit, properties, TimeUnit.SECONDS.toMillis(integrityAuditPeriodSeconds), - integrityAudit, null); + integrityAudit); } /** @@ -148,13 +133,12 @@ public class AuditThread extends Thread { * @throws IntegrityAuditException if an error occurs */ public AuditThread(String resourceName, String persistenceUnit, Properties properties, long integrityAuditMillis, - IntegrityAudit integrityAudit, BlockingQueue<CountDownLatch> queue) throws IntegrityAuditException { + IntegrityAudit integrityAudit) throws IntegrityAuditException { this.resourceName = resourceName; this.persistenceUnit = persistenceUnit; this.properties = properties; this.integrityAuditPeriodMillis = integrityAuditMillis; this.integrityAudit = integrityAudit; - this.auditLatchQueue = queue; /* * The DbDAO Constructor registers this node in the IntegrityAuditEntity table. Each @@ -174,14 +158,8 @@ public class AuditThread extends Thread { logger.info("AuditThread.run: Entering"); try { - /* - * For JUnit testing: wait for the first latch, decrement it to indicate that the thread - * has started, and then wait for the next latch, before we actually start doing - * anything. These simply return if there is no latch queue defined. - */ - getNextLatch(); - decrementLatch(); - getNextLatch(); + // for junit testing + runStarted(); /* * Triggers change in designation, unless no other viable candidate. @@ -284,11 +262,8 @@ public class AuditThread extends Thread { * property, otherwise just sleep the normal interval. */ if (auditCompleted) { - // indicate that an audit has completed - decrementLatch(); - - // don't start the next audit cycle until a latch has been provided - getNextLatch(); + // for junit testing: indicate that an audit has completed + auditCompleted(); if (logger.isDebugEnabled()) { logger.debug("AuditThread.run: Audit completed; resourceName=" + this.resourceName @@ -342,29 +317,6 @@ public class AuditThread extends Thread { } /** - * Gets the next audit-completion latch from the queue. Blocks, if the queue is empty. - * - * @throws InterruptedException if interrupted while waiting - */ - private void getNextLatch() throws InterruptedException { - BlockingQueue<CountDownLatch> queue = this.auditLatchQueue; - if (queue != null) { - this.auditCompletionLatch = queue.take(); - } - } - - /** - * Decrements the current audit-completion latch, if any. - */ - private void decrementLatch() { - CountDownLatch latch = this.auditCompletionLatch; - if (latch != null) { - this.auditCompletionLatch = null; - latch.countDown(); - } - } - - /** * Determines if an exception is an InterruptedException or was caused by an * InterruptedException. * @@ -788,6 +740,26 @@ public class AuditThread extends Thread { } /** + * Indicates that the {@link #run()} method has started. This method simply returns, + * and may overridden by junit tests. + * + * @throws InterruptedException + */ + public void runStarted() throws InterruptedException { + // does nothing + } + + /** + * Indicates that an audit has completed. This method simply returns, and may + * overridden by junit tests. + * + * @throws InterruptedException + */ + public void auditCompleted() throws InterruptedException { + // does nothing + } + + /** * Adjusts the thread-sleep-interval to be used when an audit has <i>not</i> been completed. * Used by JUnit tests. * diff --git a/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java b/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java index 9abdbe52..b3330faf 100644 --- a/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java +++ b/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java @@ -21,9 +21,6 @@ package org.onap.policy.common.ia; import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; - import org.onap.policy.common.ia.IntegrityAuditProperties.NodeTypeEnum; import org.onap.policy.common.logging.flexlogger.FlexLogger; import org.onap.policy.common.logging.flexlogger.Logger; @@ -217,37 +214,20 @@ public class IntegrityAudit { * @throws IntegrityAuditException if an error occurs */ public void startAuditThread() throws IntegrityAuditException { - startAuditThread(null); - } - - /** - * Starts the audit thread. - * - * @param queue the queue - * @return {@code true} if the thread was started, {@code false} otherwise - * @throws IntegrityAuditException if an error occurs - */ - protected boolean startAuditThread(BlockingQueue<CountDownLatch> queue) throws IntegrityAuditException { - logger.info("startAuditThread: Entering"); - boolean success = false; - if (integrityAuditPeriodMillis >= 0) { - this.auditThread = new AuditThread(this.resourceName, this.persistenceUnit, this.properties, - integrityAuditPeriodMillis, this, queue); + this.auditThread = makeAuditThread(this.resourceName, this.persistenceUnit, this.properties, integrityAuditPeriodMillis); logger.info("startAuditThread: Audit started and will run every " + integrityAuditPeriodMillis / 1000 + " seconds"); this.auditThread.start(); - success = true; + } else { logger.info("startAuditThread: Suppressing integrity audit, integrityAuditPeriodSeconds=" + integrityAuditPeriodMillis / 1000); } logger.info("startAuditThread: Exiting"); - - return success; } /** @@ -299,4 +279,29 @@ public class IntegrityAudit { return !this.auditThread.isAlive(); } } + + /** + * + * @return {@code true} if an audit thread exists, {@code false} otherwise + */ + protected boolean haveAuditThread() { + return (this.auditThread != null); + } + + /** + * Creates an audit thread. May be overridden by junit tests. + * + * @param resourceName2 + * @param persistenceUnit2 + * @param properties2 + * @param integrityAuditPeriodMillis2 + * + * @return a new audit thread + * @throws IntegrityAuditException + */ + protected AuditThread makeAuditThread(String resourceName2, String persistenceUnit2, Properties properties2, + long integrityAuditPeriodMillis2) throws IntegrityAuditException { + + return new AuditThread(resourceName2, persistenceUnit2, properties2, integrityAuditPeriodMillis2, this); + } } diff --git a/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java b/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java index afbcc452..c9179908 100644 --- a/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java +++ b/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java @@ -22,10 +22,6 @@ package org.onap.policy.common.ia; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - -import ch.qos.logback.classic.Level; -import ch.qos.logback.classic.Logger; - import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; @@ -34,21 +30,19 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; - import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.persistence.EntityTransaction; import javax.persistence.Persistence; - import org.onap.policy.common.utils.jpa.EntityMgrCloser; import org.onap.policy.common.utils.jpa.EntityMgrFactoryCloser; import org.onap.policy.common.utils.jpa.EntityTransCloser; import org.onap.policy.common.utils.test.log.logback.ExtractAppender; import org.slf4j.LoggerFactory; +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; /** * All JUnits are designed to run in the local development environment where they have write @@ -464,26 +458,26 @@ public class IntegrityAuditTestBase { protected void runAudit(MyIntegrityAudit... auditors) throws InterruptedException { // start an audit cycle on each auditor - List<CountDownLatch> latches = new ArrayList<>(auditors.length); + List<Semaphore> semaphores = new ArrayList<>(auditors.length); for (MyIntegrityAudit p : auditors) { - latches.add(p.startAudit()); + semaphores.add(p.startAudit()); } // wait for each auditor to complete its cycle - for (CountDownLatch latch : latches) { - waitLatch(latch); + for (Semaphore sem : semaphores) { + waitSem(sem); } } /** - * Waits for a latch to reach zero. + * Waits for a semaphore to be released. * - * @param latch the latch to wait for + * @param sem the semaphore for which to wait * @throws InterruptedException if the thread is interrupted * @throws AssertionError if the latch did not reach zero in the allotted time */ - protected void waitLatch(CountDownLatch latch) throws InterruptedException { - assertTrue(latch.await(WAIT_MS, TimeUnit.SECONDS)); + protected void waitSem(Semaphore sem) throws InterruptedException { + assertTrue(sem.tryAcquire(WAIT_MS, TimeUnit.SECONDS)); } /** @@ -520,11 +514,16 @@ public class IntegrityAuditTestBase { * Manages audits by inserting latches into a queue for the AuditThread to count. */ protected class MyIntegrityAudit extends IntegrityAudit { - + + /** + * Semaphore on which the audit thread should wait. + */ + private Semaphore auditSem = null; + /** - * Queue from which the AuditThread will take latches. + * Semaphore on which the junit management thread should wait. */ - private BlockingQueue<CountDownLatch> queue = null; + private Semaphore junitSem = null; /** * Constructs an auditor and starts the AuditThread. @@ -550,16 +549,14 @@ public class IntegrityAuditTestBase { } /** - * Triggers an audit by adding a latch to the queue. + * Triggers an audit by releasing the audit thread's semaphore. * - * @return the latch that was added + * @return the semaphore on which to wait * @throws InterruptedException if the thread is interrupted */ - public CountDownLatch startAudit() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - queue.add(latch); - - return latch; + public Semaphore startAudit() throws InterruptedException { + auditSem.release(); + return junitSem; } /** @@ -567,25 +564,23 @@ public class IntegrityAuditTestBase { */ @Override public final void startAuditThread() throws IntegrityAuditException { - if (queue != null) { - // queue up a bogus latch, in case a thread is still running - queue.add(new CountDownLatch(1) { - @Override - public void countDown() { - throw new RuntimeException("auditor has multiple threads"); - } - }); + if (auditSem != null) { + // release a bunch of semaphores, in case a thread is still running + auditSem.release(1000); } + + auditSem = new Semaphore(0); + junitSem = new Semaphore(0); + + super.startAuditThread(); - queue = new LinkedBlockingQueue<>(); + if (haveAuditThread()) { + // tell the thread it can run + auditSem.release(); - if (super.startAuditThread(queue)) { // wait for the thread to start - CountDownLatch latch = new CountDownLatch(1); - queue.add(latch); - try { - waitLatch(latch); + waitSem(junitSem); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -605,5 +600,31 @@ public class IntegrityAuditTestBase { assertTrue(waitThread(this)); } + + @Override + protected AuditThread makeAuditThread(String resourceName2, String persistenceUnit2, Properties properties2, + long integrityAuditPeriodMillis2) throws IntegrityAuditException { + + return new AuditThread(resourceName2, persistenceUnit2, properties2, integrityAuditPeriodMillis2, this) { + + private Semaphore auditSem = MyIntegrityAudit.this.auditSem; + private Semaphore junitSem = MyIntegrityAudit.this.junitSem; + + @Override + public void runStarted() throws InterruptedException { + auditSem.acquire(); + + junitSem.release(); + auditSem.acquire(); + } + + @Override + public void auditCompleted() throws InterruptedException { + junitSem.release(); + auditSem.acquire(); + } + + }; + } } } |