diff options
Diffstat (limited to 'integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java')
-rw-r--r-- | integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java | 170 |
1 files changed, 154 insertions, 16 deletions
diff --git a/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java b/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java index 78ff4d3a..7af82132 100644 --- a/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java +++ b/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * Integrity Audit * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,9 @@ import java.util.Comparator; import java.util.Date; import java.util.List; import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.onap.policy.common.ia.jpa.IntegrityAuditEntity; import org.onap.policy.common.logging.eelf.MessageCodes; @@ -59,9 +62,15 @@ public class AuditThread extends Thread { /* * Unless audit has already been run on this entity, number of milliseconds * to sleep between audit thread iterations. If audit has already been run, - * we sleep integrityAuditPeriodMillis. + * we sleep integrityAuditPeriodMillis. May be modified by JUnit tests. */ - private static final long AUDIT_THREAD_SLEEP_INTERVAL = 5000; + private static long auditThreadSleepIntervalMillis = 5000; + + /* + * Number of milliseconds that must elapse for audit to be considered + * complete. May be modified by JUnit tests. + */ + private static long auditCompletionIntervalMillis = AUDIT_COMPLETION_INTERVAL; /* * DB access class. @@ -91,7 +100,7 @@ public class AuditThread extends Thread { /* * See IntegrityAudit class for usage. */ - private int integrityAuditPeriodMillis; + private long integrityAuditPeriodMillis; /* * The containing IntegrityAudit instance @@ -99,6 +108,18 @@ public class AuditThread extends Thread { private IntegrityAudit integrityAudit; /** + * A latch is taken from this queue before starting an audit. May be + * {@code null}. Used by JUnit tests. + */ + private BlockingQueue<CountDownLatch> auditLatchQueue; + + /** + * Latch to be decremented when the next audit completes. May be + * {@code null}. Used by JUnit tests to wait for an audit to complete. + */ + private CountDownLatch auditCompletionLatch = null; + + /** * AuditThread constructor * @param resourceName * @param persistenceUnit @@ -110,11 +131,31 @@ public class AuditThread extends Thread { public AuditThread(String resourceName, String persistenceUnit, Properties properties, int integrityAuditPeriodSeconds, IntegrityAudit integrityAudit) throws Exception { + + this(resourceName, persistenceUnit, properties, TimeUnit.SECONDS.toMillis(integrityAuditPeriodSeconds), + integrityAudit, null); + } + + /** + * AuditThread constructor + * @param resourceName + * @param persistenceUnit + * @param properties + * @param integrityAuditMillis + * @param integrityAudit + * @param queue + * @throws Exception + */ + public AuditThread(String resourceName, String persistenceUnit, + Properties properties, long integrityAuditMillis, IntegrityAudit integrityAudit, + BlockingQueue<CountDownLatch> queue) + throws Exception { this.resourceName = resourceName; this.persistenceUnit = persistenceUnit; this.properties = properties; - this.integrityAuditPeriodMillis = integrityAuditPeriodSeconds * 1000; + this.integrityAuditPeriodMillis = integrityAuditMillis; this.integrityAudit = integrityAudit; + this.auditLatchQueue = queue; /* * The DbDAO Constructor registers this node in the IntegrityAuditEntity @@ -135,7 +176,16 @@ public class AuditThread extends Thread { logger.info("AuditThread.run: Entering"); - try { + try { + /* + * For JUnit testing: wait for the first latch, decrement it to + * indicate that the thread has started, and then wait for the + * next latch, before we actually start doing anything. These + * simply return if there is no latch queue defined. + */ + getNextLatch(); + decrementLatch(); + getNextLatch(); /* * Triggers change in designation, unless no other viable candidate. @@ -151,7 +201,6 @@ public class AuditThread extends Thread { while (true) { try{ - /* * It may have been awhile since we last cycled through this * loop, so refresh the list of IntegrityAuditEntities. @@ -249,6 +298,11 @@ public class AuditThread extends Thread { * the normal interval. */ if (auditCompleted) { + // indicate that an audit has completed + decrementLatch(); + + // don't start the next audit cycle until a latch has been provided + getNextLatch(); if (logger.isDebugEnabled()) { logger.debug("AuditThread.run: Audit completed; resourceName=" @@ -268,25 +322,32 @@ public class AuditThread extends Thread { if (logger.isDebugEnabled()) { logger.debug("AuditThread.run: resourceName=" + this.resourceName + ": Sleeping " - + AuditThread.AUDIT_THREAD_SLEEP_INTERVAL + + AuditThread.auditThreadSleepIntervalMillis + "ms"); } - Thread.sleep(AuditThread.AUDIT_THREAD_SLEEP_INTERVAL); + Thread.sleep(AuditThread.auditThreadSleepIntervalMillis); if (logger.isDebugEnabled()) { logger.debug("AuditThread.run: resourceName=" + this.resourceName + ": Awaking from " - + AuditThread.AUDIT_THREAD_SLEEP_INTERVAL + + AuditThread.auditThreadSleepIntervalMillis + "ms sleep"); } } + } catch (Exception e){ + if(isInterruptedException(e)) { + String msg = "AuditThread.run loop - Exception thrown: " + e.getMessage() + + "; Stopping."; + logger.error(MessageCodes.EXCEPTION_ERROR, e, msg); + break; + } + String msg = "AuditThread.run loop - Exception thrown: " + e.getMessage() + "; Will try audit again in " + (integrityAuditPeriodMillis/1000) + " seconds"; logger.error(MessageCodes.EXCEPTION_ERROR, e, msg); // Sleep and try again later Thread.sleep(integrityAuditPeriodMillis); - continue; } } @@ -296,10 +357,53 @@ public class AuditThread extends Thread { logger.error(MessageCodes.EXCEPTION_ERROR, e, msg); integrityAudit.setThreadInitialized(false); } + + dbDAO.destroy(); logger.info("AuditThread.run: Exiting"); } + /** + * Gets the next audit-completion latch from the queue. Blocks, if the + * queue is empty. + * @throws InterruptedException + */ + private void getNextLatch() throws InterruptedException { + BlockingQueue<CountDownLatch> queue = this.auditLatchQueue; + if(queue != null) { + this.auditCompletionLatch = queue.take(); + } + } + + /** + * Decrements the current audit-completion latch, if any. + */ + private void decrementLatch() { + CountDownLatch latch = this.auditCompletionLatch; + if(latch != null) { + this.auditCompletionLatch = null; + latch.countDown(); + } + } + + /** + * Determines if an exception is an InterruptedException or was caused + * by an InterruptedException. + * @param ex exception to be examined + * @return {@code true} if it's an InterruptedException, {@code false} otherwise + */ + private boolean isInterruptedException(Throwable ex) { + while(ex != null) { + if(ex instanceof InterruptedException) { + return true; + } + + ex = ex.getCause(); + } + + return false; + } + /* * Used to create a list that is sorted lexicographically by resourceName. */ @@ -617,7 +721,7 @@ public class AuditThread extends Thread { /** * Returns false if the lastUpdated time for the record in question is more - * than AUDIT_COMPLETION_INTERVAL seconds ago. During an audit, lastUpdated is updated every five + * than auditCompletionIntervalMillis seconds ago. During an audit, lastUpdated is updated every five * seconds or so, but when an audit finishes, the node doing the audit stops * updating lastUpdated. * @param integrityAuditEntity @@ -647,7 +751,7 @@ public class AuditThread extends Thread { lastUpdatedTime = lastUpdated.getTime(); } long timeDifference = currentTime.getTime() - lastUpdatedTime; - if (timeDifference > AUDIT_COMPLETION_INTERVAL) { + if (timeDifference > auditCompletionIntervalMillis) { stale = true; } @@ -678,7 +782,7 @@ public class AuditThread extends Thread { } /* - * If more than (AUDIT_COMPLETION_INTERVAL * 2) milliseconds have elapsed + * If more than (auditCompletionIntervalMillis * 2) milliseconds have elapsed * since we last ran the audit, reset auditCompleted, so * * 1) we'll eventually re-run the audit, if no other node picks up the @@ -707,7 +811,7 @@ public class AuditThread extends Thread { long lastUpdatedTime = lastUpdated.getTime(); timeDifference = currentTime.getTime() - lastUpdatedTime; - if (timeDifference > (AUDIT_COMPLETION_INTERVAL * 2)) { + if (timeDifference > (auditCompletionIntervalMillis * 2)) { if (logger.isDebugEnabled()) { logger.debug("resetAuditCompleted: Resetting auditCompleted for resourceName=" + this.resourceName); @@ -750,7 +854,7 @@ public class AuditThread extends Thread { + this.resourceName); } if (IntegrityAudit.isUnitTesting()) { - dbAudit.dbAuditSimulate(this.resourceName, this.persistenceUnit); + dbAudit.dbAuditSimulate(this.resourceName, this.persistenceUnit, AuditThread.AUDIT_SIMULATION_ITERATIONS, AuditThread.auditThreadSleepIntervalMillis); } else { dbAudit.dbAudit(this.resourceName, this.persistenceUnit, this.nodeType); @@ -762,4 +866,38 @@ public class AuditThread extends Thread { } + /** + * Adjusts the thread-sleep-interval to be used when an audit has + * <i>not</i> been completed. Used by JUnit tests. + * @param auditThreadSleepIntervalMillis + */ + protected static void setAuditThreadSleepIntervalMillis(long auditThreadSleepIntervalMillis) { + AuditThread.auditThreadSleepIntervalMillis = auditThreadSleepIntervalMillis; + } + + /** + * Gets the current thread-sleep-interval to be used when an audit has + * <i>not</i> been completed. Used by JUnit tests. + * @return the current sleep interval, in milli-seconds + */ + protected static long getAuditThreadSleepIntervalMillis() { + return auditThreadSleepIntervalMillis; + } + + /** + * Adjusts the audit-completion-interval. Used by JUnit tests. + * @param auditThreadSleepIntervalMillis + */ + protected static void setAuditCompletionIntervalMillis(long auditThreadSleepIntervalMillis) { + AuditThread.auditCompletionIntervalMillis = auditThreadSleepIntervalMillis; + } + + /** + * Gets the audit-completion-interval. Used by JUnit tests. + * @return the current audit-completion interval, in milli-seconds + */ + protected static long getAuditCompletionIntervalMillis() { + return auditCompletionIntervalMillis; + } + } |