summaryrefslogtreecommitdiffstats
path: root/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
diff options
context:
space:
mode:
Diffstat (limited to 'integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java')
-rw-r--r--integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java170
1 files changed, 154 insertions, 16 deletions
diff --git a/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java b/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
index 78ff4d3a..7af82132 100644
--- a/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
+++ b/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Integrity Audit
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,9 @@ import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.onap.policy.common.ia.jpa.IntegrityAuditEntity;
import org.onap.policy.common.logging.eelf.MessageCodes;
@@ -59,9 +62,15 @@ public class AuditThread extends Thread {
/*
* Unless audit has already been run on this entity, number of milliseconds
* to sleep between audit thread iterations. If audit has already been run,
- * we sleep integrityAuditPeriodMillis.
+ * we sleep integrityAuditPeriodMillis. May be modified by JUnit tests.
*/
- private static final long AUDIT_THREAD_SLEEP_INTERVAL = 5000;
+ private static long auditThreadSleepIntervalMillis = 5000;
+
+ /*
+ * Number of milliseconds that must elapse for audit to be considered
+ * complete. May be modified by JUnit tests.
+ */
+ private static long auditCompletionIntervalMillis = AUDIT_COMPLETION_INTERVAL;
/*
* DB access class.
@@ -91,7 +100,7 @@ public class AuditThread extends Thread {
/*
* See IntegrityAudit class for usage.
*/
- private int integrityAuditPeriodMillis;
+ private long integrityAuditPeriodMillis;
/*
* The containing IntegrityAudit instance
@@ -99,6 +108,18 @@ public class AuditThread extends Thread {
private IntegrityAudit integrityAudit;
/**
+ * A latch is taken from this queue before starting an audit. May be
+ * {@code null}. Used by JUnit tests.
+ */
+ private BlockingQueue<CountDownLatch> auditLatchQueue;
+
+ /**
+ * Latch to be decremented when the next audit completes. May be
+ * {@code null}. Used by JUnit tests to wait for an audit to complete.
+ */
+ private CountDownLatch auditCompletionLatch = null;
+
+ /**
* AuditThread constructor
* @param resourceName
* @param persistenceUnit
@@ -110,11 +131,31 @@ public class AuditThread extends Thread {
public AuditThread(String resourceName, String persistenceUnit,
Properties properties, int integrityAuditPeriodSeconds, IntegrityAudit integrityAudit)
throws Exception {
+
+ this(resourceName, persistenceUnit, properties, TimeUnit.SECONDS.toMillis(integrityAuditPeriodSeconds),
+ integrityAudit, null);
+ }
+
+ /**
+ * AuditThread constructor
+ * @param resourceName
+ * @param persistenceUnit
+ * @param properties
+ * @param integrityAuditMillis
+ * @param integrityAudit
+ * @param queue
+ * @throws Exception
+ */
+ public AuditThread(String resourceName, String persistenceUnit,
+ Properties properties, long integrityAuditMillis, IntegrityAudit integrityAudit,
+ BlockingQueue<CountDownLatch> queue)
+ throws Exception {
this.resourceName = resourceName;
this.persistenceUnit = persistenceUnit;
this.properties = properties;
- this.integrityAuditPeriodMillis = integrityAuditPeriodSeconds * 1000;
+ this.integrityAuditPeriodMillis = integrityAuditMillis;
this.integrityAudit = integrityAudit;
+ this.auditLatchQueue = queue;
/*
* The DbDAO Constructor registers this node in the IntegrityAuditEntity
@@ -135,7 +176,16 @@ public class AuditThread extends Thread {
logger.info("AuditThread.run: Entering");
- try {
+ try {
+ /*
+ * For JUnit testing: wait for the first latch, decrement it to
+ * indicate that the thread has started, and then wait for the
+ * next latch, before we actually start doing anything. These
+ * simply return if there is no latch queue defined.
+ */
+ getNextLatch();
+ decrementLatch();
+ getNextLatch();
/*
* Triggers change in designation, unless no other viable candidate.
@@ -151,7 +201,6 @@ public class AuditThread extends Thread {
while (true) {
try{
-
/*
* It may have been awhile since we last cycled through this
* loop, so refresh the list of IntegrityAuditEntities.
@@ -249,6 +298,11 @@ public class AuditThread extends Thread {
* the normal interval.
*/
if (auditCompleted) {
+ // indicate that an audit has completed
+ decrementLatch();
+
+ // don't start the next audit cycle until a latch has been provided
+ getNextLatch();
if (logger.isDebugEnabled()) {
logger.debug("AuditThread.run: Audit completed; resourceName="
@@ -268,25 +322,32 @@ public class AuditThread extends Thread {
if (logger.isDebugEnabled()) {
logger.debug("AuditThread.run: resourceName="
+ this.resourceName + ": Sleeping "
- + AuditThread.AUDIT_THREAD_SLEEP_INTERVAL
+ + AuditThread.auditThreadSleepIntervalMillis
+ "ms");
}
- Thread.sleep(AuditThread.AUDIT_THREAD_SLEEP_INTERVAL);
+ Thread.sleep(AuditThread.auditThreadSleepIntervalMillis);
if (logger.isDebugEnabled()) {
logger.debug("AuditThread.run: resourceName="
+ this.resourceName + ": Awaking from "
- + AuditThread.AUDIT_THREAD_SLEEP_INTERVAL
+ + AuditThread.auditThreadSleepIntervalMillis
+ "ms sleep");
}
}
+
} catch (Exception e){
+ if(isInterruptedException(e)) {
+ String msg = "AuditThread.run loop - Exception thrown: " + e.getMessage()
+ + "; Stopping.";
+ logger.error(MessageCodes.EXCEPTION_ERROR, e, msg);
+ break;
+ }
+
String msg = "AuditThread.run loop - Exception thrown: " + e.getMessage()
+ "; Will try audit again in " + (integrityAuditPeriodMillis/1000) + " seconds";
logger.error(MessageCodes.EXCEPTION_ERROR, e, msg);
// Sleep and try again later
Thread.sleep(integrityAuditPeriodMillis);
- continue;
}
}
@@ -296,10 +357,53 @@ public class AuditThread extends Thread {
logger.error(MessageCodes.EXCEPTION_ERROR, e, msg);
integrityAudit.setThreadInitialized(false);
}
+
+ dbDAO.destroy();
logger.info("AuditThread.run: Exiting");
}
+ /**
+ * Gets the next audit-completion latch from the queue. Blocks, if the
+ * queue is empty.
+ * @throws InterruptedException
+ */
+ private void getNextLatch() throws InterruptedException {
+ BlockingQueue<CountDownLatch> queue = this.auditLatchQueue;
+ if(queue != null) {
+ this.auditCompletionLatch = queue.take();
+ }
+ }
+
+ /**
+ * Decrements the current audit-completion latch, if any.
+ */
+ private void decrementLatch() {
+ CountDownLatch latch = this.auditCompletionLatch;
+ if(latch != null) {
+ this.auditCompletionLatch = null;
+ latch.countDown();
+ }
+ }
+
+ /**
+ * Determines if an exception is an InterruptedException or was caused
+ * by an InterruptedException.
+ * @param ex exception to be examined
+ * @return {@code true} if it's an InterruptedException, {@code false} otherwise
+ */
+ private boolean isInterruptedException(Throwable ex) {
+ while(ex != null) {
+ if(ex instanceof InterruptedException) {
+ return true;
+ }
+
+ ex = ex.getCause();
+ }
+
+ return false;
+ }
+
/*
* Used to create a list that is sorted lexicographically by resourceName.
*/
@@ -617,7 +721,7 @@ public class AuditThread extends Thread {
/**
* Returns false if the lastUpdated time for the record in question is more
- * than AUDIT_COMPLETION_INTERVAL seconds ago. During an audit, lastUpdated is updated every five
+ * than auditCompletionIntervalMillis seconds ago. During an audit, lastUpdated is updated every five
* seconds or so, but when an audit finishes, the node doing the audit stops
* updating lastUpdated.
* @param integrityAuditEntity
@@ -647,7 +751,7 @@ public class AuditThread extends Thread {
lastUpdatedTime = lastUpdated.getTime();
}
long timeDifference = currentTime.getTime() - lastUpdatedTime;
- if (timeDifference > AUDIT_COMPLETION_INTERVAL) {
+ if (timeDifference > auditCompletionIntervalMillis) {
stale = true;
}
@@ -678,7 +782,7 @@ public class AuditThread extends Thread {
}
/*
- * If more than (AUDIT_COMPLETION_INTERVAL * 2) milliseconds have elapsed
+ * If more than (auditCompletionIntervalMillis * 2) milliseconds have elapsed
* since we last ran the audit, reset auditCompleted, so
*
* 1) we'll eventually re-run the audit, if no other node picks up the
@@ -707,7 +811,7 @@ public class AuditThread extends Thread {
long lastUpdatedTime = lastUpdated.getTime();
timeDifference = currentTime.getTime() - lastUpdatedTime;
- if (timeDifference > (AUDIT_COMPLETION_INTERVAL * 2)) {
+ if (timeDifference > (auditCompletionIntervalMillis * 2)) {
if (logger.isDebugEnabled()) {
logger.debug("resetAuditCompleted: Resetting auditCompleted for resourceName="
+ this.resourceName);
@@ -750,7 +854,7 @@ public class AuditThread extends Thread {
+ this.resourceName);
}
if (IntegrityAudit.isUnitTesting()) {
- dbAudit.dbAuditSimulate(this.resourceName, this.persistenceUnit);
+ dbAudit.dbAuditSimulate(this.resourceName, this.persistenceUnit, AuditThread.AUDIT_SIMULATION_ITERATIONS, AuditThread.auditThreadSleepIntervalMillis);
} else {
dbAudit.dbAudit(this.resourceName, this.persistenceUnit,
this.nodeType);
@@ -762,4 +866,38 @@ public class AuditThread extends Thread {
}
+ /**
+ * Adjusts the thread-sleep-interval to be used when an audit has
+ * <i>not</i> been completed. Used by JUnit tests.
+ * @param auditThreadSleepIntervalMillis
+ */
+ protected static void setAuditThreadSleepIntervalMillis(long auditThreadSleepIntervalMillis) {
+ AuditThread.auditThreadSleepIntervalMillis = auditThreadSleepIntervalMillis;
+ }
+
+ /**
+ * Gets the current thread-sleep-interval to be used when an audit has
+ * <i>not</i> been completed. Used by JUnit tests.
+ * @return the current sleep interval, in milli-seconds
+ */
+ protected static long getAuditThreadSleepIntervalMillis() {
+ return auditThreadSleepIntervalMillis;
+ }
+
+ /**
+ * Adjusts the audit-completion-interval. Used by JUnit tests.
+ * @param auditThreadSleepIntervalMillis
+ */
+ protected static void setAuditCompletionIntervalMillis(long auditThreadSleepIntervalMillis) {
+ AuditThread.auditCompletionIntervalMillis = auditThreadSleepIntervalMillis;
+ }
+
+ /**
+ * Gets the audit-completion-interval. Used by JUnit tests.
+ * @return the current audit-completion interval, in milli-seconds
+ */
+ protected static long getAuditCompletionIntervalMillis() {
+ return auditCompletionIntervalMillis;
+ }
+
}