/*
* ============LICENSE_START=======================================================
* Integrity Audit
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
*/
package org.onap.policy.common.ia;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.EntityTransaction;
import javax.persistence.Persistence;
import org.onap.policy.common.utils.jpa.EntityMgrCloser;
import org.onap.policy.common.utils.jpa.EntityMgrFactoryCloser;
import org.onap.policy.common.utils.jpa.EntityTransCloser;
import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
import org.onap.policy.common.utils.time.CurrentTime;
import org.onap.policy.common.utils.time.TestTime;
import org.powermock.reflect.Whitebox;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
/**
* All JUnits are designed to run in the local development environment where they have write
* privileges and can execute time-sensitive tasks.
*
*
Many of the test verification steps are performed by scanning for items written to the log
* file. Rather than actually scan the log file, an {@link ExtractAppender} is used to monitor
* events that are logged and extract relevant items. In order to attach the appender to the debug
* log, it assumes that the debug log is a logback Logger configured per EELF.
*
*
These tests use a temporary, in-memory DB, which is dropped once the tests complete.
*/
public class IntegrityAuditTestBase {
/**
* Root of the debug logger, as defined in the logback-test.xml.
*/
protected static final Logger debugLogger = (Logger) LoggerFactory.getLogger("com.att.eelf.debug");
/**
* Root of the error logger, as defined in the logback-test.xml.
*/
protected static final Logger errorLogger = (Logger) LoggerFactory.getLogger("com.att.eelf.error");
/**
* Directory containing the log files.
*/
private static final String LOG_DIR = "testingLogs/common-modules/integrity-audit";
/**
* Name of the field within the AuditorTime class that supplies the time.
*/
public static final String TIME_SUPPLY_FIELD = "supplier";
/**
* Max time, in milliseconds, to wait for a semaphore.
*/
protected static final long WAIT_MS = 5000L;
/**
* Number of seconds in an audit period.
*/
public static final int AUDIT_PERIOD_SEC = 5;
public static final String DEFAULT_DB_URL_PREFIX = "jdbc:h2:mem:";
protected static final String dbDriver = "org.h2.Driver";
protected static final String dbUser = "testu";
protected static final String dbPwd = "testp";
protected static final String siteName = "SiteA";
protected static final String nodeType = "pdp_xacml";
// will be defined by the test *Classes*
protected static String dbUrl;
/**
* Persistence unit for PDP sequence A.
*/
protected static final String A_SEQ_PU = "testPU";
/**
* Persistence unit for PDP sequence B.
*/
protected static final String B_SEQ_PU = "integrityAuditPU";
/**
* Matches the start of an audit for arbitrary PDPs in the debug log.
*/
protected static final String START_AUDIT_RE = "Running audit for persistenceUnit=\\w+ on resourceName=([^,]*)";
/**
* Properties to be used in all tests.
*/
protected static Properties properties;
/**
* Entity manager factory pointing to the in-memory DB for A_SEQ_PU.
*/
protected static EntityManagerFactory emf;
/**
* Entity manager factory pointing to the in-memory DB associated with emf.
*/
protected static EntityManager em;
/**
* Current time used by given test.
*/
private static ThreadLocal testTime = ThreadLocal.withInitial(() -> null);
/**
* Supplies the test time so that each thread maintains its own notion of "current"
* time.
*/
private static Supplier timeSupplier = () -> testTime.get();
/**
* Saved debug logger level, to be restored once all tests complete.
*/
private static Level savedDebugLevel;
/**
* Saved error logger level, to be restored once all tests complete.
*/
private static Level savedErrorLevel;
/**
* Saved time, to be restored once all tests complete.
*/
private static Supplier savedTime;
/**
* List of auditors whose threads must be stopped when a given test case ends.
*/
private List auditors;
/**
* List of appenders that must be removed from loggers when a given test case ends.
*/
private List appenders;
/**
* Saves current configuration information and then sets new values.
*
* @param dbDriver the name of the DB Driver class
* @param dbUrl the URL to the DB
* @throws IOException if an IO error occurs
*/
protected static void setUpBeforeClass(String dbUrl) throws IOException {
// truncate the logs
new FileOutputStream(LOG_DIR + "/audit.log").close();
new FileOutputStream(LOG_DIR + "/debug.log").close();
new FileOutputStream(LOG_DIR + "/error.log").close();
new FileOutputStream(LOG_DIR + "/metrics.log").close();
IntegrityAuditTestBase.dbUrl = dbUrl;
// save data that we have to restore at the end of the test
savedTime = Whitebox.getInternalState(AuditorTime.class, TIME_SUPPLY_FIELD);
savedDebugLevel = debugLogger.getLevel();
savedErrorLevel = errorLogger.getLevel();
IntegrityAudit.setUnitTesting(true);
properties = new Properties();
properties.put(IntegrityAuditProperties.DB_DRIVER, dbDriver);
properties.put(IntegrityAuditProperties.DB_URL, dbUrl);
properties.put(IntegrityAuditProperties.DB_USER, dbUser);
properties.put(IntegrityAuditProperties.DB_PWD, dbPwd);
properties.put(IntegrityAuditProperties.SITE_NAME, siteName);
properties.put(IntegrityAuditProperties.NODE_TYPE, nodeType);
emf = Persistence.createEntityManagerFactory(A_SEQ_PU, makeProperties());
// keep this open so the in-memory DB stays around until all tests are
// done
em = emf.createEntityManager();
Whitebox.setInternalState(AuditorTime.class, TIME_SUPPLY_FIELD, timeSupplier);
debugLogger.setLevel(Level.DEBUG);
errorLogger.setLevel(Level.ERROR);
}
/**
* Restores the configuration to what it was before the test.
*/
protected static void tearDownAfterClass() {
IntegrityAudit.setUnitTesting(false);
Whitebox.setInternalState(AuditorTime.class, TIME_SUPPLY_FIELD, savedTime);
debugLogger.setLevel(savedDebugLevel);
errorLogger.setLevel(savedErrorLevel);
// this should result in the in-memory DB being deleted
em.close();
emf.close();
}
/**
* Sets up for a test, which includes deleting all records from the IntegrityAuditEntity table.
*/
protected void setUp() {
auditors = new LinkedList<>();
appenders = new LinkedList<>();
properties.put(IntegrityAuditProperties.AUDIT_PERIOD_SECONDS, String.valueOf(AUDIT_PERIOD_SEC));
TestTime time = new TestTime();
testTime.set(time);
// Clean up the DB
try (EntityTransCloser etc = new EntityTransCloser(em.getTransaction())) {
EntityTransaction et = etc.getTransation();
em.createQuery("Delete from IntegrityAuditEntity").executeUpdate();
// commit transaction
et.commit();
}
}
/**
* Cleans up after a test, removing any ExtractAppenders from the logger and stopping any
* AuditThreads.
*/
protected void tearDown() {
for (LogApp p : appenders) {
p.detach();
}
for (MyIntegrityAudit p : auditors) {
p.stopAuditThread();
}
}
/**
* @return the {@link TestTime} in use by this thread
*/
public static TestTime getTestTime() {
return testTime.get();
}
/**
* Truncate the table.
*
* @param properties the properties
* @param persistenceUnit the persistence unit
* @param tableName the name of the table
*/
public void truncateTable(Properties properties, String persistenceUnit, String tableName) {
try (EntityMgrFactoryCloser emfc =
new EntityMgrFactoryCloser(Persistence.createEntityManagerFactory(persistenceUnit, properties));
EntityMgrCloser emc = new EntityMgrCloser(emfc.getFactory().createEntityManager());
EntityTransCloser etc = new EntityTransCloser(emc.getManager().getTransaction())) {
EntityManager em = emc.getManager();
EntityTransaction et = etc.getTransation();
// Clean up the DB
em.createQuery("Delete from " + tableName).executeUpdate();
// commit transaction
et.commit();
}
}
/**
* Verifies that items appear within the log, in order. A given item may appear more than once.
* In addition, the log may contain extra items; those are ignored.
*
* @param textre regular expression used to extract an item from a line in the log. The first
* "capture" group of the regular expression is assumed to contain the extracted item
* @param items items that should be matched by the items extracted from the log, in order
* @throws IOException if an IO error occurs
* @throws AssertionError if the desired items were not all found
*/
protected void verifyItemsInLog(ExtractAppender app, String... items) throws IOException {
Iterator it = new ArrayList<>(Arrays.asList(items)).iterator();
if (!it.hasNext()) {
return;
}
String expected = it.next();
String last = null;
for (String extractedText : app.getExtracted()) {
if (extractedText.equals(expected)) {
if (!it.hasNext()) {
// matched all of the items
return;
}
last = expected;
expected = it.next();
} else if (!extractedText.equals(last)) {
List remaining = getRemaining(expected, it);
fail("missing items " + remaining + ", but was: " + extractedText);
}
}
List remaining = getRemaining(expected, it);
assertTrue("missing items " + remaining, remaining.isEmpty());
}
/**
* Gets the remaining items from an iterator.
*
* @param current the current item, to be included within the list
* @param it iterator from which to get the remaining items
* @return a list of the remaining items
*/
private LinkedList getRemaining(String current, Iterator it) {
LinkedList remaining = new LinkedList<>();
remaining.add(current);
while (it.hasNext()) {
remaining.add(it.next());
}
return remaining;
}
/**
* Waits for a thread to stop. If the thread doesn't complete in the allotted time, then it
* interrupts it and waits again.
*
* @param auditor the thread for which to wait
* @return {@code true} if the thread stopped, {@code false} otherwise
*/
public boolean waitThread(MyIntegrityAudit auditor) {
if (auditor != null) {
try {
auditor.interrupt();
if (!auditor.joinAuditThread(WAIT_MS)) {
System.out.println("failed to stop audit thread");
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return true;
}
/**
* Makes a new auditor.
*
* @param resourceName2 the name of the resource
* @param persistenceUnit2 the persistence unit
* @return a new auditor
* @throws Exception if an error occurs
*/
protected MyIntegrityAudit makeAuditor(String resourceName2, String persistenceUnit2) throws Exception {
// each auditor gets its own notion of time
TestTime time = new TestTime();
// use the auditor-specific time while this thread constructs things
testTime.set(time);
return new MyIntegrityAudit(resourceName2, persistenceUnit2, makeProperties(), time);
}
/**
* Watches for patterns in a logger by attaching a ExtractAppender to it.
*
* @param logger the logger to watch
* @param regex regular expression used to extract relevant text
* @return a new appender
*/
protected ExtractAppender watch(Logger logger, String regex) {
ExtractAppender app = new ExtractAppender(regex);
appenders.add(new LogApp(logger, app));
return app;
}
/**
* Makes a new Property set that's a clone of {@link #properties}.
*
* @return a new Property set containing all of a copy of all of the {@link #properties}
*/
protected static Properties makeProperties() {
Properties props = new Properties();
props.putAll(properties);
return props;
}
/**
* Waits for data to become stale and then runs an audit on several auditors in parallel.
*
* @param auditors the auditors
* @throws InterruptedException if a thread is interrupted
*/
protected void waitStaleAndRun(MyIntegrityAudit... auditors) throws InterruptedException {
waitStale();
runAudit(auditors);
}
/**
* Runs an audit on several auditors in parallel.
*
* @param auditors the auditors
* @throws InterruptedException if a thread is interrupted
*/
protected void runAudit(MyIntegrityAudit... auditors) throws InterruptedException {
// start an audit cycle on each auditor
List semaphores = new ArrayList<>(auditors.length);
for (MyIntegrityAudit p : auditors) {
semaphores.add(p.startAudit());
}
// wait for each auditor to complete its cycle
for (Semaphore sem : semaphores) {
waitSem(sem);
}
}
/**
* Waits for a semaphore to be released.
*
* @param sem the semaphore for which to wait
* @throws InterruptedException if the thread is interrupted
* @throws AssertionError if the semaphore did not reach zero in the allotted time
*/
protected void waitSem(Semaphore sem) throws InterruptedException {
assertTrue(sem.tryAcquire(WAIT_MS, TimeUnit.MILLISECONDS));
}
/**
* Sleep a bit so that the currently designated pdp becomes stale.
*
* @throws InterruptedException if the thread is interrupted
*/
protected void waitStale() throws InterruptedException {
// waits for ALL auditors to become stale, as each has its own timer
for (MyIntegrityAudit auditor : auditors) {
auditor.sleep(AuditThread.AUDIT_COMPLETION_INTERVAL * AuditThread.AUDIT_RESET_CYCLES + 1);
}
}
/**
* Tracks which appender has been added to a logger.
*/
private static class LogApp {
private final Logger logger;
private final ExtractAppender appender;
public LogApp(Logger logger, ExtractAppender appender) {
this.logger = logger;
this.appender = appender;
logger.addAppender(appender);
appender.start();
}
public void detach() {
logger.detachAppender(appender);
}
}
/**
* Manages audits by inserting semaphores into a queue for the AuditThread to count.
*/
protected class MyIntegrityAudit extends IntegrityAudit {
private final TestTime myTime;
/**
* Semaphore on which the audit thread should wait.
*/
private Semaphore auditSem = null;
/**
* Semaphore on which the junit management thread should wait.
*/
private Semaphore junitSem = null;
/**
* Constructs an auditor and starts the AuditThread.
*
* @param resourceName the resource name
* @param persistenceUnit the persistence unit
* @param properties the properties
* @param time
* @throws Exception if an error occurs
*/
public MyIntegrityAudit(String resourceName, String persistenceUnit, Properties properties, TestTime time) throws Exception {
super(resourceName, persistenceUnit, properties);
myTime = time;
testTime.set(myTime);
auditors.add(this);
startAuditThread();
}
/**
* @return the "current" time for the auditor
*/
public long getTimeInMillis() {
return myTime.getMillis();
}
/**
* Sleeps for a period of time.
*
* @param sleepMs
* @throws InterruptedException
*/
public void sleep(long sleepMs) throws InterruptedException {
myTime.sleep(sleepMs);
}
/**
* Interrupts the AuditThread.
*/
public void interrupt() {
super.stopAuditThread();
}
/**
* Triggers an audit by releasing the audit thread's semaphore.
*
* @return the semaphore on which to wait
* @throws InterruptedException if the thread is interrupted
*/
public Semaphore startAudit() throws InterruptedException {
auditSem.release();
return junitSem;
}
/**
* Starts a new AuditThread. Creates a new pair of semaphores and associates them
* with the thread.
*/
@Override
public final void startAuditThread() throws IntegrityAuditException {
if (auditSem != null) {
// release a bunch of semaphores, in case a thread is still running
auditSem.release(1000);
}
auditSem = new Semaphore(0);
junitSem = new Semaphore(0);
super.startAuditThread();
if (haveAuditThread()) {
// tell the thread it can run
auditSem.release();
// wait for the thread to start
try {
waitSem(junitSem);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IntegrityAuditException(e);
}
}
}
/**
* Stops the AuditThread and waits for it to stop.
*
* @throws AssertionError if the thread is still running
*/
@Override
public void stopAuditThread() {
super.stopAuditThread();
assertTrue(waitThread(this));
}
@Override
protected AuditThread makeAuditThread(String resourceName2, String persistenceUnit2, Properties properties2,
int integrityAuditPeriodSeconds2) throws IntegrityAuditException {
// make sure we're still using the auditor's time while we construct things
testTime.set(myTime);
return new AuditThread(resourceName2, persistenceUnit2, properties2, integrityAuditPeriodSeconds2, this) {
private Semaphore auditSem = MyIntegrityAudit.this.auditSem;
private Semaphore junitSem = MyIntegrityAudit.this.junitSem;
@Override
public void run() {
// make sure our thread uses this auditor's time
testTime.set(myTime);
super.run();
}
@Override
public void runStarted() throws InterruptedException {
auditSem.acquire();
junitSem.release();
auditSem.acquire();
}
@Override
public void auditCompleted() throws InterruptedException {
junitSem.release();
auditSem.acquire();
}
};
}
}
}