diff options
Diffstat (limited to 'integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java')
-rw-r--r-- | integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java | 3814 |
1 files changed, 1908 insertions, 1906 deletions
diff --git a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java index 880d39f5..c32a2213 100644 --- a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java +++ b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java @@ -54,1917 +54,1919 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * IntegrityMonitor Main class for monitoring the integrity of a resource and - * managing its state. State management follows the X.731 ITU standard. + * IntegrityMonitor Main class for monitoring the integrity of a resource and managing its state. + * State management follows the X.731 ITU standard. */ public class IntegrityMonitor { - private static final Logger logger = LoggerFactory.getLogger(IntegrityMonitor.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(IntegrityMonitor.class.getName()); - // only allow one instance of IntegrityMonitor - private static IntegrityMonitor instance = null; + // only allow one instance of IntegrityMonitor + private static IntegrityMonitor instance = null; - private static String resourceName = null; - boolean alarmExists = false; + private static String resourceName = null; + boolean alarmExists = false; - /* - * Error message that is written by the dependencyCheck() method. It is made - * available externally through the evaluateSanity() method. - */ - private String dependencyCheckErrorMsg = ""; + /* + * Error message that is written by the dependencyCheck() method. It is made available + * externally through the evaluateSanity() method. + */ + private String dependencyCheckErrorMsg = ""; - // The entity manager factory for JPA access - private EntityManagerFactory emf; - private EntityManager em; + // The entity manager factory for JPA access + private EntityManagerFactory emf; + private EntityManager em; - // Persistence Unit for JPA - public static final String PERSISTENCE_UNIT = "operationalPU"; - - private static String persistenceUnit = PERSISTENCE_UNIT; - - private static final long CYCLE_INTERVAL_MILLIS = 1000l; - - private static long cycleIntervalMillis = CYCLE_INTERVAL_MILLIS; - - /** - * Units used for intervals extracted from the properties, which are - * typically given in seconds. - */ - private static TimeUnit propertyUnits = TimeUnit.SECONDS; - - private StateManagement stateManager = null; - - private FPManager fpManager = null; - - // The forward progress counter is incremented as the - // process being monitored makes forward progress - private int fpCounter = 0; - private int lastFpCounter = 0; - - // elapsed time since last FP counter check - private long elapsedTime = 0; - - // elapsed time since last test transaction check - private long elapsedTestTransTime = 0; - - // elapsed time since last write Fpc check - private long elapsedWriteFpcTime = 0; - - // last dependency health check time. Initialize so that the periodic check - // starts after 60 seconds. - // This allows time for dependents to come up. - private long lastDependencyCheckTime = System.currentTimeMillis(); - - // Time of the last state audit. It is initialized at the time of the IM - // construction - private Date lastStateAuditTime = new Date(); - - // Interval between state audits in ms. We leave it turned off by default so - // that it will only - // be run on the nodes which we want doing the audit. In particular, we only - // want it to run - // on the droolspdps - private static long stateAuditIntervalMs = 0L; - - // the number of cycles since 'fpCounter' was last changed - private int missedCycles = 0; - - // forward progress monitoring interval - private static long monitorIntervalMs = 1000L * IntegrityMonitorProperties.DEFAULT_MONITOR_INTERVAL; - // The number of periods the counter fails to increment before an alarm is - // raised. - private static int failedCounterThreshold = IntegrityMonitorProperties.DEFAULT_FAILED_COUNTER_THRESHOLD; - // test transaction interval - private static long testTransIntervalMs = 1000L * IntegrityMonitorProperties.DEFAULT_TEST_INTERVAL; - // write Fpc to DB interval - private static long writeFpcIntervalMs = 1000L * IntegrityMonitorProperties.DEFAULT_WRITE_FPC_INTERVAL; - // check the health of dependencies - private static long checkDependencyIntervalMs = 1000L - * IntegrityMonitorProperties.DEFAULT_CHECK_DEPENDENCY_INTERVAL; - - // A lead subsystem will have dependency groups with resource names in the - // properties file. - // For non-lead subsystems, the dependency_group property will be absent. - private static String[] depGroups = null; - - private static boolean isUnitTesting = false; - - // can turn on health checking of dependents via jmx test() call by setting - // this property to true - private static boolean testViaJmx = false; - - private static String jmxFqdn = null; - - // this is the max interval allowed without any forward progress - // counter updates - private static long maxFpcUpdateIntervalMs = 1000L * IntegrityMonitorProperties.DEFAULT_MAX_FPC_UPDATE_INTERVAL; - - // Node types - private enum NodeType { - PDP_XACML, PDP_DROOLS, PAP, PAP_ADMIN, LOGPARSER, BRMS_GATEWAY, ASTRA_GATEWAY, ELK_SERVER, PYPDP - - } - - private static String siteName; - private static String nodeType; - private Date refreshStateAuditLastRunDate; - private static long refreshStateAuditIntervalMs = 600000; // run it once per - // 10 minutes - - // lock objects - private final Object evaluateSanityLock = new Object(); - private final Object fpMonitorCycleLock = new Object(); - private final Object dependencyCheckLock = new Object(); - private final Object testTransactionLock = new Object(); - private final Object startTransactionLock = new Object(); - private final Object endTransactionLock = new Object(); - private final Object checkTestTransactionLock = new Object(); - private final Object checkWriteFpcLock = new Object(); - private static final Object getInstanceLock = new Object(); - private final Object refreshStateAuditLock = new Object(); - private final Object imFlushLock = new Object(); - - private Map<String, String> allSeemsWellMap; - private Map<String, String> allNotWellMap; - - /** - * IntegrityMonitor constructor. It is invoked from the getInstance() method - * in this class or from the constructor of a child or sub-class. A class - * can extend the IntegrityMonitor class if there is a need to override any - * of the base methods (ex. subsystemTest()). Only one instance is allowed - * to be created per resource name. - * - * @param resourceName - * The resource name of the resource - * @param properties - * a set of properties passed in from the resource - * @throws IntegrityMonitorException - * if any errors are encountered in the constructor - */ - protected IntegrityMonitor(String resourceName, Properties properties) throws IntegrityMonitorException { - - this(resourceName, properties, null); - } - - /** - * IntegrityMonitor constructor. It is invoked from the getInstance() method - * in this class or from the constructor of a child or sub-class. A class - * can extend the IntegrityMonitor class if there is a need to override any - * of the base methods (ex. subsystemTest()). Only one instance is allowed - * to be created per resource name. - * - * @param resourceName - * The resource name of the resource - * @param properties - * a set of properties passed in from the resource - * @param queue - * queue to use to control the FPManager thread, or {@code null} - * @throws IntegrityMonitorException - * if any errors are encountered in the constructor - */ - protected IntegrityMonitor(String resourceName, Properties properties, BlockingQueue<CountDownLatch> queue) - throws IntegrityMonitorException { - - // singleton check since this constructor can be called from a child or - // sub-class - if (instance != null) { - String msg = "IM object exists and only one instance allowed"; - logger.error("{}", msg); - throw new IntegrityMonitorException("IntegrityMonitor constructor exception: " + msg); - } - instance = this; - - IntegrityMonitor.resourceName = resourceName; - - /* - * Validate that the properties file contains all the needed properties. - * Throws an IntegrityMonitorPropertiesException - */ - validateProperties(properties); - - // construct jmx url - String jmxUrl = getJmxUrl(); - - // - // Create the entity manager factory - // - emf = Persistence.createEntityManagerFactory(persistenceUnit, properties); - // - // Did it get created? - // - if (emf == null) { - logger.error("Error creating IM entity manager factory with persistence unit: {}", persistenceUnit); - throw new IntegrityMonitorException("Unable to create IM Entity Manager Factory"); - } - - // add entry to forward progress and resource registration tables in DB - - // Start a transaction - em = emf.createEntityManager(); - EntityTransaction et = em.getTransaction(); - - et.begin(); - - try { - // if ForwardProgress entry exists for resourceName, update it. If - // not found, create a new entry - Query fquery = em.createQuery("Select f from ForwardProgressEntity f where f.resourceName=:rn"); - fquery.setParameter("rn", resourceName); - - @SuppressWarnings("rawtypes") - List fpList = fquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); - ForwardProgressEntity fpx = null; - if (!fpList.isEmpty()) { - // ignores multiple results - fpx = (ForwardProgressEntity) fpList.get(0); - // refresh the object from DB in case cached data was returned - em.refresh(fpx); - if (logger.isDebugEnabled()) { - logger.debug("Resource {} exists and will be updated - old fpc= {}, lastUpdated= {}", resourceName, - fpx.getFpcCount(), fpx.getLastUpdated()); - } - fpx.setFpcCount(fpCounter); - } else { - // Create a forward progress object - logger.debug("Adding resource {} to ForwardProgress table", resourceName); - fpx = new ForwardProgressEntity(); - } - // update/set columns in entry - fpx.setResourceName(resourceName); - em.persist(fpx); - // flush to the DB - synchronized (imFlushLock) { - em.flush(); - } - - // if ResourceRegistration entry exists for resourceName, update it. - // If not found, create a new entry - Query rquery = em.createQuery("Select r from ResourceRegistrationEntity r where r.resourceName=:rn"); - rquery.setParameter("rn", resourceName); - - @SuppressWarnings("rawtypes") - List rrList = rquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); - ResourceRegistrationEntity rrx = null; - if (!rrList.isEmpty()) { - // ignores multiple results - rrx = (ResourceRegistrationEntity) rrList.get(0); - // refresh the object from DB in case cached data was returned - em.refresh(rrx); - if (logger.isDebugEnabled()) { - logger.debug("Resource {} exists and will be updated - old url= {}, createdDate={}", resourceName, - rrx.getResourceUrl(), rrx.getCreatedDate()); - } - rrx.setLastUpdated(new Date()); - } else { - // register resource by adding entry to table in DB - logger.debug("Adding resource {} to ResourceRegistration table", resourceName); - rrx = new ResourceRegistrationEntity(); - } - // update/set columns in entry - rrx.setResourceName(resourceName); - rrx.setResourceUrl(jmxUrl); - rrx.setNodeType(nodeType); - rrx.setSite(siteName); - em.persist(rrx); - // flush to the DB - synchronized (imFlushLock) { - et.commit(); - } - - } catch (Exception e) { - logger.error("IntegrityMonitor constructor DB table update failed with exception: ", e); - try { - if (et.isActive()) { - synchronized (imFlushLock) { - et.rollback(); - } - } - } catch (Exception e1) { - logger.error("IntegrityMonitor constructor threw exception: ", e1); - } - throw e; - } - - try { - // create instance of StateMangement class and pass emf to it - stateManager = new StateManagement(emf, resourceName); - - /** - * Initialize the state and status attributes. This will maintain any - * Administrative state value but will set the operational state = - * enabled, availability status = null, standby status = null. The - * integrity monitor will set the operational state via the FPManager - * and the owning application must set the standby status by calling - * promote/demote on the StateManager. - */ - stateManager.initializeState(); - - } catch(StateManagementException e) { - throw new IntegrityMonitorException(e); - } - - // create management bean - try { - new ComponentAdmin(resourceName, this, stateManager); - } catch (Exception e) { - logger.error("ComponentAdmin constructor exception: {}", e.toString(), e); - } - - fpManager = new FPManager(queue); - - } - - /** - * Get an instance of IntegrityMonitor for a given resource name. It creates - * one if it does not exist. Only one instance is allowed to be created per - * resource name. - * - * @param resourceName - * The resource name of the resource - * @param properties - * a set of properties passed in from the resource - * @return The new instance of IntegrityMonitor - * @throws IntegrityMonitorException - * if unable to create jmx url or the constructor returns an - * exception - */ - public static IntegrityMonitor getInstance(String resourceName, Properties properties) throws IntegrityMonitorException { - return getInstance(resourceName, properties, null); - } - - /** - * Get an instance of IntegrityMonitor for a given resource name. It creates - * one if it does not exist. Only one instance is allowed to be created per - * resource name. - * - * @param resourceName - * The resource name of the resource - * @param properties - * a set of properties passed in from the resource - * @param queue - * queue to use to control the FPManager thread, or {@code null} - * @return The new instance of IntegrityMonitor - * @throws IntegrityMonitorException - * if unable to create jmx url or the constructor returns an - * exception - */ - protected static IntegrityMonitor getInstance(String resourceName, Properties properties, - BlockingQueue<CountDownLatch> queue) throws IntegrityMonitorException { - - synchronized (getInstanceLock) { - logger.debug("getInstance() called - resourceName= {}", resourceName); - if (resourceName == null || resourceName.isEmpty() || properties == null) { - logger.error("Error: getIntegrityMonitorInstance() called with invalid input"); - return null; - } - - if (instance == null) { - logger.debug("Creating new instance of IntegrityMonitor"); - instance = new IntegrityMonitor(resourceName, properties, queue); - } - return instance; - } - } - - public static IntegrityMonitor getInstance() throws IntegrityMonitorException { - logger.debug("getInstance() called"); - if (instance == null) { - String msg = "No IntegrityMonitor instance exists." - + " Please use the method IntegrityMonitor.getInstance(String resourceName, Properties properties)"; - throw new IntegrityMonitorPropertiesException(msg); - } else { - return instance; - } - } - - /* - * This is a facility used by JUnit testing to destroy the IntegrityMonitor - * instance before creating a new one. It waits a bit to allow the FPManager - * to fully exit. - */ - public static void deleteInstance() throws IntegrityMonitorException { - logger.debug("deleteInstance() called"); - synchronized (getInstanceLock) { - if (isUnitTesting() && instance != null && instance.getFPManager() != null) { - FPManager fpm = instance.getFPManager(); - - // Stop the FPManager thread - fpm.stopAndExit(); - - try { - // Make sure it has exited - fpm.join(2000L); - } catch (InterruptedException e) { - logger.error("deleteInstance: Interrupted while waiting for FPManaager to fully exit", e); - Thread.currentThread().interrupt(); - } - - if (fpm.isAlive()) { - logger.error("IntegrityMonitor.deleteInstance() Failed to kill FPManager thread"); - throw new IntegrityMonitorException( - "IntegrityMonitor.deleteInstance() Failed to kill FPManager thread"); - } - - instance = null; - } - } - logger.debug("deleteInstance() exit"); - } - - private FPManager getFPManager() { - return fpManager; - } - - private static String getJmxUrl() throws IntegrityMonitorException { - - // get the jmx remote port and construct the JMX URL - Properties systemProps = System.getProperties(); - String jmxPort = systemProps.getProperty("com.sun.management.jmxremote.port"); - String jmxErrMsg; - if (jmxPort == null) { - jmxErrMsg = "System property com.sun.management.jmxremote.port for JMX remote port is not set"; - logger.error("{}", jmxErrMsg); - throw new IntegrityMonitorException("getJmxUrl exception: " + jmxErrMsg); - } - - int port = 0; - try { - port = Integer.parseInt(jmxPort); - } catch (NumberFormatException e) { - jmxErrMsg = "JMX remote port is not a valid integer value - " + jmxPort; - logger.error("{}", jmxErrMsg); - throw new IntegrityMonitorException("getJmxUrl exception: " + jmxErrMsg); - } - - try { - if (jmxFqdn == null) { - jmxFqdn = InetAddress.getLocalHost().getCanonicalHostName(); // get - // FQDN - // of - // this - // host - } - } catch (Exception e) { - String msg = "getJmxUrl could not get hostname"; - logger.error("{}", msg, e); - throw new IntegrityMonitorException("getJmxUrl Exception: " + msg); - } - if (jmxFqdn == null) { - String msg = "getJmxUrl encountered null hostname"; - logger.error("{}", msg); - throw new IntegrityMonitorException("getJmxUrl error: " + msg); - } - - // assemble the jmx url - String jmxUrl = "service:jmx:rmi:///jndi/rmi://" + jmxFqdn + ":" + port + "/jmxrmi"; - - logger.debug("IntegerityMonitor - jmx url={}", jmxUrl); - - return jmxUrl; - } - - /** - * evaluateSanity() is designed to be called by an external entity to - * evealuate the sanity of the node. It checks the operational and - * administrative states and the standby status. If the operational state is - * disabled, it will include the dependencyCheckErrorMsg which includes - * information about any dependency (node) which has failed. - */ - public void evaluateSanity() throws IntegrityMonitorException { - logger.debug("evaluateSanity called ...."); - synchronized (evaluateSanityLock) { - - String errorMsg = dependencyCheckErrorMsg; - logger.debug("evaluateSanity dependencyCheckErrorMsg = {}", errorMsg); - // check op state and throw exception if disabled - if ((stateManager.getOpState() != null) && stateManager.getOpState().equals(StateManagement.DISABLED)) { - String msg = "Resource " + resourceName + " operation state is disabled. " + errorMsg; - logger.debug("{}", msg); - throw new IntegrityMonitorException(msg); - } - - // check admin state and throw exception if locked - if ((stateManager.getAdminState() != null) && stateManager.getAdminState().equals(StateManagement.LOCKED)) { - String msg = "Resource " + resourceName + " is administratively locked"; - logger.debug("{}", msg); - throw new AdministrativeStateException("IntegrityMonitor Admin State Exception: " + msg); - } - // check standby state and throw exception if cold standby - if ((stateManager.getStandbyStatus() != null) - && stateManager.getStandbyStatus().equals(StateManagement.COLD_STANDBY)) { - String msg = "Resource " + resourceName + " is cold standby"; - logger.debug("{}", msg); - throw new StandbyStatusException("IntegrityMonitor Standby Status Exception: " + msg); - } - - } - - } - - /* - * This method checks the forward progress counter and the state of a - * dependency. If the dependency is unavailable or failed, an error message - * is created which is checked when evaluateSanity interface is called. If - * the error message is set then the evaluateSanity will return an error. - */ - public String stateCheck(String dep) { - logger.debug("checking state of dependent resource: {}", dep); - String errorMsg = null; - ForwardProgressEntity forwardProgressEntity = null; - StateManagementEntity stateManagementEntity = null; - - // Start a transaction - EntityTransaction et = em.getTransaction(); - et.begin(); - - try { - Query query = em.createQuery("Select p from ForwardProgressEntity p where p.resourceName=:resource"); - query.setParameter("resource", dep); - - @SuppressWarnings("rawtypes") - List fpList = query.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); - - if (!fpList.isEmpty()) { - // exists - forwardProgressEntity = (ForwardProgressEntity) fpList.get(0); - // refresh the object from DB in case cached data was returned - em.refresh(forwardProgressEntity); - logger.debug("Found entry in ForwardProgressEntity table for dependent Resource={}", dep); - } else { - errorMsg = dep + ": resource not found in ForwardProgressEntity database table"; - logger.error("{}", errorMsg); - } - synchronized (imFlushLock) { - et.commit(); - } - } catch (Exception ex) { - // log an error - errorMsg = dep + ": ForwardProgressEntity DB operation failed with exception: "; - logger.error("{}", errorMsg, ex); - synchronized (imFlushLock) { - if (et.isActive()) { - et.rollback(); - } - } - } - - if (errorMsg == null) { - // Start a transaction - et = em.getTransaction(); - et.begin(); - try { - // query if StateManagement entry exists for dependent resource - Query query = em.createQuery("Select p from StateManagementEntity p where p.resourceName=:resource"); - query.setParameter("resource", dep); - - @SuppressWarnings("rawtypes") - List smList = query.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); - if (!smList.isEmpty()) { - // exist - stateManagementEntity = (StateManagementEntity) smList.get(0); - // refresh the object from DB in case cached data was - // returned - em.refresh(stateManagementEntity); - logger.debug("Found entry in StateManagementEntity table for dependent Resource={}", dep); - } else { - errorMsg = dep + ": resource not found in state management entity database table"; - logger.error("{}", errorMsg); - } - - synchronized (imFlushLock) { - et.commit(); - } - } catch (Exception e) { - // log an error - errorMsg = dep + ": StateManagementEntity DB read failed with exception: "; - logger.error("{}", errorMsg, e); - synchronized (imFlushLock) { - if (et.isActive()) { - et.rollback(); - } - } - } - } - - // verify that the ForwardProgress is current (check last_updated) - if (errorMsg == null) { - if (forwardProgressEntity != null && stateManagementEntity != null) { - Date date = new Date(); - long diffMs = date.getTime() - forwardProgressEntity.getLastUpdated().getTime(); - logger.debug("IntegrityMonitor.stateCheck(): diffMs = {}", diffMs); - - // Threshold for a stale entry - long staleMs = maxFpcUpdateIntervalMs; - logger.debug("IntegrityMonitor.stateCheck(): staleMs = {}", staleMs); - - if (diffMs > staleMs) { - // ForwardProgress is stale. Disable it - try { - if (!stateManagementEntity.getOpState().equals(StateManagement.DISABLED)) { - logger.debug("IntegrityMonitor.stateCheck(): Changing OpStat = disabled for {}", dep); - stateManager.disableFailed(dep); - } - } catch (Exception e) { - String msg = "IntegrityMonitor.stateCheck(): Failed to diableFail dependent resource = " + dep - + "; " + e.getMessage(); - logger.error("{}", msg, e); - } - } - } else { - - if (forwardProgressEntity == null) { - String msg = "IntegrityMonitor.stateCheck(): Failed to diableFail dependent resource = " + dep - + "; " + " forwardProgressEntity == null."; - logger.error("{}", msg); - } - - else { - String msg = "IntegrityMonitor.stateCheck(): Failed to diableFail dependent resource = " + dep - + "; " + " stateManagementEntity == null."; - logger.error("{}", msg); - } - } - } - - // check operation, admin and standby states of dependent resource - if (errorMsg == null) { - if (stateManagementEntity != null) { - if ((stateManager.getAdminState() != null) - && stateManagementEntity.getAdminState().equals(StateManagement.LOCKED)) { - errorMsg = dep + ": resource is administratively locked"; - logger.error("{}", errorMsg); - } else if ((stateManager.getOpState() != null) - && stateManagementEntity.getOpState().equals(StateManagement.DISABLED)) { - errorMsg = dep + ": resource is operationally disabled"; - logger.error("{}", errorMsg); - } else if ((stateManager.getStandbyStatus() != null) - && stateManagementEntity.getStandbyStatus().equals(StateManagement.COLD_STANDBY)) { - errorMsg = dep + ": resource is cold standby"; - logger.error("{}", errorMsg); - } - } else { - errorMsg = dep + ": could not check standy state of resource. stateManagementEntity == null."; - logger.error("{}", errorMsg); - } - } - - String returnMsg = "IntegrityMonitor.stateCheck(): returned error_msg: " + errorMsg; - logger.debug("{}", returnMsg); - return errorMsg; - } - - private String fpCheck(String dep) { - logger.debug("checking forward progress count of dependent resource: {}", dep); - - String errorMsg = null; - - // check FPC count - a changing FPC count indicates the resource JVM is - // running - - // Start a transaction - EntityTransaction et = em.getTransaction(); - et.begin(); - try { - Query fquery = em.createQuery("Select f from ForwardProgressEntity f where f.resourceName=:rn"); - fquery.setParameter("rn", dep); - - @SuppressWarnings("rawtypes") - List fpList = fquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); - ForwardProgressEntity fpx; - if (!fpList.isEmpty()) { - // ignores multiple results - fpx = (ForwardProgressEntity) fpList.get(0); - // refresh the object from DB in case cached data was returned - em.refresh(fpx); - if (logger.isDebugEnabled()) { - logger.debug("Dependent resource {} - fpc= {}, lastUpdated={}", dep, fpx.getFpcCount(), - fpx.getLastUpdated()); - } - long currTime = System.currentTimeMillis(); - // if dependent resource FPC has not been updated, consider it - // an error - if ((currTime - fpx.getLastUpdated().getTime()) > maxFpcUpdateIntervalMs) { - errorMsg = dep + ": FP count has not been updated in the last " + maxFpcUpdateIntervalMs + "ms"; - logger.error("{}", errorMsg); - try { - // create instance of StateMangement class for dependent - StateManagement depStateManager = new StateManagement(emf, dep); - if (!depStateManager.getOpState().equals(StateManagement.DISABLED)) { - logger.debug( - "Forward progress not detected for dependent resource {}. Setting dependent's state to disable failed.", - dep); - depStateManager.disableFailed(); - } - } catch (Exception e) { - // ignore errors - logger.error("Update dependent state failed with exception: ", e); - } - } - } else { - // resource entry not found in FPC table - errorMsg = dep + ": resource not found in ForwardProgressEntity table in the DB"; - logger.error("{}", errorMsg); - } - synchronized (imFlushLock) { - et.commit(); - } - } catch (Exception e) { - // log an error and continue - errorMsg = dep + ": ForwardProgressEntity DB read failed with exception: "; - logger.error("{}", errorMsg, e); - synchronized (imFlushLock) { - if (et.isActive()) { - et.rollback(); - } - } - } - - return errorMsg; - } - - public List<ForwardProgressEntity> getAllForwardProgressEntity() { - logger.debug("getAllForwardProgressEntity: entry"); - ArrayList<ForwardProgressEntity> fpList = new ArrayList<>(); - // Start a transaction - EntityTransaction et = em.getTransaction(); - et.begin(); - try { - Query fquery = em.createQuery("Select e from ForwardProgressEntity e"); - @SuppressWarnings("rawtypes") - List myList = fquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); - synchronized (imFlushLock) { - et.commit(); - } - logger.debug("getAllForwardProgressEntity: myList.size(): {}", myList.size()); - if (!myList.isEmpty()) { - for (int i = 0; i < myList.size(); i++) { - if (logger.isDebugEnabled()) { - logger.debug("getAllForwardProgressEntity: myList.get({}).getResourceName(): {}", i, - ((ForwardProgressEntity) myList.get(i)).getResourceName()); - } - fpList.add((ForwardProgressEntity) myList.get(i)); - } - } - synchronized (imFlushLock) { - if (et.isActive()) { - et.commit(); - } - } - } catch (Exception e) { - // log an error and continue - String msg = "getAllForwardProgessEntity DB read failed with exception: "; - logger.error("{}", msg, e); - synchronized (imFlushLock) { - if (et.isActive()) { - et.rollback(); - } - } - } - return fpList; - } - - private String jmxCheck(String dep) { - logger.debug("checking health of dependent by calling test() via JMX on resource: {}", dep); - - String errorMsg = null; - - // get the JMX URL from the database - String jmxUrl = null; - // Start a transaction - EntityTransaction et = em.getTransaction(); - et.begin(); - try { - // query if ResourceRegistration entry exists for resourceName - Query rquery = em.createQuery("Select r from ResourceRegistrationEntity r where r.resourceName=:rn"); - rquery.setParameter("rn", dep); - - @SuppressWarnings("rawtypes") - List rrList = rquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); - ResourceRegistrationEntity rrx = null; - - if (!rrList.isEmpty()) { - // ignores multiple results - rrx = (ResourceRegistrationEntity) rrList.get(0); - // refresh the object from DB in case cached data was returned - em.refresh(rrx); - jmxUrl = rrx.getResourceUrl(); - if (logger.isDebugEnabled()) { - logger.debug("Dependent Resource={}, url={}, createdDate={}", dep, jmxUrl, rrx.getCreatedDate()); - } - } else { - errorMsg = dep + ": resource not found in ResourceRegistrationEntity table in the DB"; - logger.error("{}", errorMsg); - } - - synchronized (imFlushLock) { - et.commit(); - } - } catch (Exception e) { - errorMsg = dep + ": ResourceRegistrationEntity DB read failed with exception: "; - logger.error("{}", errorMsg, e); - synchronized (imFlushLock) { - if (et.isActive()) { - et.rollback(); - } - } - } - - if (jmxUrl != null) { - JmxAgentConnection jmxAgentConnection = null; - try { - jmxAgentConnection = new JmxAgentConnection(jmxUrl); - MBeanServerConnection mbeanServer = jmxAgentConnection.getMBeanConnection(); - ComponentAdminMBean admin = JMX.newMXBeanProxy(mbeanServer, ComponentAdmin.getObjectName(dep), - ComponentAdminMBean.class); - - // invoke the test method via the jmx proxy - admin.test(); - logger.debug("Dependent resource {} sanity test passed", dep); - } catch (Exception e) { - errorMsg = dep + ": resource sanity test failed with exception: "; - logger.error("{}", errorMsg, e); - } finally { - // close the JMX connector - if (jmxAgentConnection != null) { - jmxAgentConnection.disconnect(); - } - } - } - - return errorMsg; - } - - public String dependencyCheck() { - logger.debug("dependencyCheck: entry"); - synchronized (dependencyCheckLock) { - - // Start with the error message empty - String errorMsg = ""; - boolean dependencyFailure = false; - - /* - * Before we check dependency groups we need to check subsystemTest. - */ - try { - // Test any subsystems that are not covered under the dependency - // relationship - subsystemTest(); - } catch (Exception e) { - logger.error("IntegrityMonitor threw exception", e); - dependencyFailure = true; - // This indicates a subsystemTest failure - try { - if (logger.isDebugEnabled()) { - logger.debug( - "{}: There has been a subsystemTest failure with error:{} Updating this resource's state to disableDependency", - resourceName, e.getMessage()); - } - // Capture the subsystemTest failure info - if (!errorMsg.isEmpty()) { - errorMsg = errorMsg.concat(","); - } - errorMsg = errorMsg.concat(resourceName + ": " + e.getMessage()); - this.stateManager.disableDependency(); - } catch (Exception ex) { - logger.error("IntegrityMonitor threw exception.", ex); - if (!errorMsg.isEmpty()) { - errorMsg = errorMsg.concat(","); - } - errorMsg = errorMsg.concat("\n" + resourceName - + ": Failed to disable dependency after subsystemTest failure due to: " + ex.getMessage()); - } - } - - // Check the sanity of dependents for lead subcomponents - if (depGroups != null && depGroups.length > 0) { - // check state of resources in dependency groups - for (String group : depGroups) { - group = group.trim(); - if (group.isEmpty()) { - // ignore empty group - continue; - } - String[] dependencies = group.split(","); - if (logger.isDebugEnabled()) { - logger.debug("group dependencies = {}", Arrays.toString(dependencies)); - } - int realDepCount = 0; - int failDepCount = 0; - for (String dep : dependencies) { - dep = dep.trim(); - if (dep.isEmpty()) { - // ignore empty dependency - continue; - } - realDepCount++; // this is a valid dependency whose - // state is tracked - String failMsg = fpCheck(dep); // if a resource is - // down, its FP count - // will not be - // incremented - if (failMsg == null) { - if (testViaJmx) { - failMsg = jmxCheck(dep); - } else { - failMsg = stateCheck(dep); - } - } - if (failMsg != null) { - failDepCount++; - if (!errorMsg.isEmpty()) { - errorMsg = errorMsg.concat(", "); - } - errorMsg = errorMsg.concat(failMsg); - } - } // end for (String dep : dependencies) - - // if all dependencies in a group are failed, set this - // resource's state to disable dependency - if ((realDepCount > 0) && (failDepCount == realDepCount)) { - dependencyFailure = true; - try { - logger.debug( - "All dependents in group {} have failed their health check. Updating this resource's state to disableDependency", - group); - if (stateManager.getAvailStatus() == null || !((stateManager.getAvailStatus()) - .equals(StateManagement.DEPENDENCY) - || (stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY_FAILED))) { - // Note: redundant calls are made by - // refreshStateAudit - this.stateManager.disableDependency(); - } - } catch (Exception e) { - logger.error("IntegrityMonitor threw exception.", e); - if (!errorMsg.isEmpty()) { - errorMsg = errorMsg.concat(","); - } - errorMsg = errorMsg.concat(resourceName + ": Failed to disable dependency"); - break; // break out on failure and skip checking - // other groups - } - } - // check the next group - - } // end for (String group : depGroups) - - /* - * We have checked all the dependency groups. If all are ok and - * subsystemTest passed, dependencyFailure == false - */ - if (!dependencyFailure) { - try { - logger.debug( - "All dependency groups have at least one viable member. Updating this resource's state to enableNoDependency"); - if (stateManager.getAvailStatus() != null - && ((stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY) - || (stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY_FAILED))) { - // Note: redundant calls are made by - // refreshStateAudit - this.stateManager.enableNoDependency(); - } // The refreshStateAudit will catch the case where it - // is disabled but availStatus != failed - } catch (Exception e) { - logger.error("IntegrityMonitor threw exception.", e); - if (!errorMsg.isEmpty()) { - errorMsg = errorMsg.concat(","); - } - errorMsg = errorMsg.concat(resourceName + ": Failed to enable no dependency"); - } - } - } else if (!dependencyFailure) { - /* - * This is put here to clean up when no dependency group should - * exist, but one was erroneously added which caused the state - * to be disabled/dependency/coldstandby and later removed. We - * saw this happen in the lab, but is not very likely in a - * production environment...but you never know. - */ - try { - logger.debug("There are no dependents. Updating this resource's state to enableNoDependency"); - if (stateManager.getAvailStatus() != null - && ((stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY) - || (stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY_FAILED))) { - // Note: redundant calls are made by refreshStateAudit - this.stateManager.enableNoDependency(); - } // The refreshStateAudit will catch the case where it is - // disabled but availStatus != failed - } catch (Exception e) { - logger.error("IntegrityMonitor threw exception.", e); - if (!errorMsg.isEmpty()) { - errorMsg = errorMsg.concat(","); - } - errorMsg = errorMsg.concat(resourceName + ": Failed to enable no dependency"); - } - } - - if (!errorMsg.isEmpty()) { - logger.error("Sanity failure detected in a dependent resource: {}", errorMsg); - - } - - dependencyCheckErrorMsg = errorMsg; - lastDependencyCheckTime = System.currentTimeMillis(); - logger.debug("dependencyCheck: exit"); - return errorMsg; - } - } - - /** - * Execute a test transaction. It is called when the test transaction timer - * fires. It could be overridden to provide additional test functionality. - * If overridden, the overriding method must invoke startTransaction() and - * endTransaction() and check if the allNotWellMap is empty. - */ - public void testTransaction() { - synchronized (testTransactionLock) { - logger.debug("testTransaction: entry"); - // - // startTransaction() not required for testTransaction - // - - // end transaction - increments local FP counter - endTransaction(); - } - } - - /** - * Additional testing for subsystems that do not have a /test interface (for - * ex. 3rd party processes like elk). This method would be overridden by the - * subsystem. - */ - public void subsystemTest() throws IntegrityMonitorException { - // Testing provided by subsystem - logger.debug("IntegrityMonitor subsystemTest() OK"); - } - - /** - * Checks admin state and resets transaction timer. Called by application at - * the start of a transaction. - * - * @throws AdministrativeStateException - * throws admin state exception if resource is locked - * @throws StandbyStatusException - */ - public void startTransaction() throws IntegrityMonitorException { - - synchronized (startTransactionLock) { - // check admin state and throw exception if locked - if ((stateManager.getAdminState() != null) && stateManager.getAdminState().equals(StateManagement.LOCKED)) { - String msg = "Resource " + resourceName + " is administratively locked"; - - throw new AdministrativeStateException("IntegrityMonitor Admin State Exception: " + msg); - } - // check standby state and throw exception if locked - - if ((stateManager.getStandbyStatus() != null) - && (stateManager.getStandbyStatus().equals(StateManagement.HOT_STANDBY) - || stateManager.getStandbyStatus().equals(StateManagement.COLD_STANDBY))) { - String msg = "Resource " + resourceName + " is standby"; - - throw new StandbyStatusException("IntegrityMonitor Standby Status Exception: " + msg); - } - - // reset transactionTimer so it will not fire - elapsedTestTransTime = 0; - } - } - - /** - * Increment the local forward progress counter. Called by application at - * the end of each transaction (successful or not). - */ - public void endTransaction() { - synchronized (endTransactionLock) { - if (getAllNotWellMap() != null) { - if (!(getAllNotWellMap().isEmpty())) { - /* - * An entity has reported that it is not well. We must not - * allow the the forward progress counter to advance. - */ - String msg = "allNotWellMap:"; - for (Entry<String, String> entry : allNotWellMap.entrySet()) { - msg = msg.concat("\nkey = " + entry.getKey() + " msg = " + entry.getValue()); - } - logger.error("endTransaction: allNotWellMap is NOT EMPTY. Not advancing forward" - + "progress counter. \n{}\n", msg); - return; - } else { - if (logger.isDebugEnabled()) { - if (getAllSeemsWellMap() != null) { - if (!(getAllSeemsWellMap().isEmpty())) { - String msg = "allSeemsWellMap:"; - for (Entry<String, String> entry : allSeemsWellMap.entrySet()) { - msg = msg.concat("\nkey = " + entry.getKey() + " msg = " + entry.getValue()); - } - logger.debug( - "endTransaction: allNotWellMap IS EMPTY and allSeemsWellMap is NOT EMPTY. Advancing forward" - + "progress counter. \n{}\n", - msg); - } - } - } - } - } - // increment local FPC - fpCounter++; - } - } - - // update FP count in DB with local FP count - private void writeFpc() throws IntegrityMonitorException { - - // Start a transaction - EntityTransaction et = em.getTransaction(); - - if (!et.isActive()) { - et.begin(); - } - - try { - // query if ForwardProgress entry exists for resourceName - Query fquery = em.createQuery("Select f from ForwardProgressEntity f where f.resourceName=:rn"); - fquery.setParameter("rn", resourceName); - - @SuppressWarnings("rawtypes") - List fpList = fquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); - ForwardProgressEntity fpx; - if (!fpList.isEmpty()) { - // ignores multiple results - fpx = (ForwardProgressEntity) fpList.get(0); - // refresh the object from DB in case cached data was returned - em.refresh(fpx); - if (logger.isDebugEnabled()) { - logger.debug("Updating FP entry: Resource={}, fpcCount={}, lastUpdated={}, new fpcCount={}", - resourceName, fpx.getFpcCount(), fpx.getLastUpdated(), fpCounter); - } - fpx.setFpcCount(fpCounter); - em.persist(fpx); - // flush to the DB and commit - synchronized (imFlushLock) { - et.commit(); - } - } else { - // Error - FP entry does not exist - String msg = "FP entry not found in database for resource " + resourceName; - throw new IntegrityMonitorException(msg); - } - } catch (Exception e) { - try { - synchronized (imFlushLock) { - if (et.isActive()) { - et.rollback(); - } - } - } catch (Exception e1) { - logger.error("IntegrityMonitor threw exception.", e1); - } - logger.error("writeFpc DB table commit failed with exception: {}", e); - throw e; - } - } - - // retrieve state manager reference - public final StateManagement getStateManager() { - return this.stateManager; - } - - /** - * Read and validate properties - * - * @throws IntegrityMonitorPropertiesException - */ - private static void validateProperties(Properties prop) throws IntegrityMonitorPropertiesException { - - if (prop.getProperty(IntegrityMonitorProperties.DB_DRIVER) == null) { - String msg = IntegrityMonitorProperties.DB_DRIVER + " property is null"; - logger.error("{}", msg); - throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); - } - - if (prop.getProperty(IntegrityMonitorProperties.DB_URL) == null) { - String msg = IntegrityMonitorProperties.DB_URL + " property is null"; - logger.error("{}", msg); - throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); - } - - if (prop.getProperty(IntegrityMonitorProperties.DB_USER) == null) { - String msg = IntegrityMonitorProperties.DB_USER + " property is null"; - logger.error("{}", msg); - throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); - } - - if (prop.getProperty(IntegrityMonitorProperties.DB_PWD) == null) { - String msg = IntegrityMonitorProperties.DB_PWD + " property is null"; - logger.error("{}", msg); - throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); - } - - if (prop.getProperty(IntegrityMonitorProperties.FP_MONITOR_INTERVAL) != null) { - try { - monitorIntervalMs = toMillis( - Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.FP_MONITOR_INTERVAL).trim())); - } catch (NumberFormatException e) { - logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.FP_MONITOR_INTERVAL, e); - } - } - - if (prop.getProperty(IntegrityMonitorProperties.FAILED_COUNTER_THRESHOLD) != null) { - try { - failedCounterThreshold = Integer - .parseInt(prop.getProperty(IntegrityMonitorProperties.FAILED_COUNTER_THRESHOLD).trim()); - } catch (NumberFormatException e) { - logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.FAILED_COUNTER_THRESHOLD, e); - } - } - - if (prop.getProperty(IntegrityMonitorProperties.TEST_TRANS_INTERVAL) != null) { - try { - testTransIntervalMs = toMillis( - Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.TEST_TRANS_INTERVAL).trim())); - } catch (NumberFormatException e) { - logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.TEST_TRANS_INTERVAL, e); - } - } - - if (prop.getProperty(IntegrityMonitorProperties.WRITE_FPC_INTERVAL) != null) { - try { - writeFpcIntervalMs = toMillis( - Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.WRITE_FPC_INTERVAL).trim())); - } catch (NumberFormatException e) { - logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.WRITE_FPC_INTERVAL, e); - } - } - - if (prop.getProperty(IntegrityMonitorProperties.CHECK_DEPENDENCY_INTERVAL) != null) { - try { - checkDependencyIntervalMs = toMillis(Integer - .parseInt(prop.getProperty(IntegrityMonitorProperties.CHECK_DEPENDENCY_INTERVAL).trim())); - } catch (NumberFormatException e) { - logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.CHECK_DEPENDENCY_INTERVAL, e); - } - } - - // dependency_groups are a semi-colon separated list of groups - // each group is a comma separated list of resource names - // For ex. dependency_groups = site_1.pap_1,site_1.pap_2 ; site_1.pdp_1, - // site_1.pdp_2 - if (prop.getProperty(IntegrityMonitorProperties.DEPENDENCY_GROUPS) != null) { - try { - depGroups = prop.getProperty(IntegrityMonitorProperties.DEPENDENCY_GROUPS).split(";"); - if (logger.isDebugEnabled()) { - logger.debug("dependency groups property = {}", Arrays.toString(depGroups)); - } - } catch (Exception e) { - logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.DEPENDENCY_GROUPS, e); - } - } - - siteName = prop.getProperty(IntegrityMonitorProperties.SITE_NAME); - if (siteName == null) { - String msg = IntegrityMonitorProperties.SITE_NAME + " property is null"; - logger.error("{}", msg); - throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); - } else { - siteName = siteName.trim(); - } - - nodeType = prop.getProperty(IntegrityMonitorProperties.NODE_TYPE); - if (nodeType == null) { - String msg = IntegrityMonitorProperties.NODE_TYPE + " property is null"; - logger.error("{}", msg); - throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); - } else { - nodeType = nodeType.trim(); - if (!isNodeTypeEnum(nodeType)) { - String msg = IntegrityMonitorProperties.NODE_TYPE + " property " + nodeType + " is invalid"; - logger.error("{}", msg); - throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); - } - } - - if (prop.getProperty(IntegrityMonitorProperties.TEST_VIA_JMX) != null) { - String jmxTest = prop.getProperty(IntegrityMonitorProperties.TEST_VIA_JMX).trim(); - testViaJmx = Boolean.parseBoolean(jmxTest); - } - - if (prop.getProperty(IntegrityMonitorProperties.JMX_FQDN) != null) { - jmxFqdn = prop.getProperty(IntegrityMonitorProperties.JMX_FQDN).trim(); - if (jmxFqdn.isEmpty()) { - jmxFqdn = null; - } - } - - if (prop.getProperty(IntegrityMonitorProperties.MAX_FPC_UPDATE_INTERVAL) != null) { - try { - maxFpcUpdateIntervalMs = toMillis( - Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.MAX_FPC_UPDATE_INTERVAL).trim())); - } catch (NumberFormatException e) { - logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.MAX_FPC_UPDATE_INTERVAL, e); - } - } - - if (prop.getProperty(IntegrityMonitorProperties.STATE_AUDIT_INTERVAL_MS) != null) { - try { - stateAuditIntervalMs = Long - .parseLong(prop.getProperty(IntegrityMonitorProperties.STATE_AUDIT_INTERVAL_MS)); - } catch (NumberFormatException e) { - logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.STATE_AUDIT_INTERVAL_MS, e); - } - } - - if (prop.getProperty(IntegrityMonitorProperties.REFRESH_STATE_AUDIT_INTERVAL_MS) != null) { - try { - refreshStateAuditIntervalMs = Long - .parseLong(prop.getProperty(IntegrityMonitorProperties.REFRESH_STATE_AUDIT_INTERVAL_MS)); - } catch (NumberFormatException e) { - logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.REFRESH_STATE_AUDIT_INTERVAL_MS, - e); - } - } - - logger.debug("IntegrityMonitor.validateProperties(): Property values \n" + "maxFpcUpdateIntervalMs = {}\n", - maxFpcUpdateIntervalMs); - - return; - } - - public static void updateProperties(Properties newprop) { - if (isUnitTesting()) { - try { - validateProperties(newprop); - } catch (IntegrityMonitorPropertiesException e) { - logger.error("IntegrityMonitor threw exception.", e); - } - } else { - logger.debug("Update integrity monitor properties not allowed"); - } - } - - private static boolean isNodeTypeEnum(String nodeType) { - String upper = nodeType.toUpperCase(); - for (NodeType n : NodeType.values()) { - if (n.toString().equals(upper)) { - return true; - } - } - return false; - } - - /** - * Look for "Forward Progress" -- if the 'FPMonitor' is stalled for too - * long, the operational state is changed to 'Disabled', and an alarm is - * set. The state is restored when forward progress continues. - */ - private void fpMonitorCycle() { - logger.debug("fpMonitorCycle(): entry"); - synchronized (fpMonitorCycleLock) { - // monitoring interval checks - if (monitorIntervalMs <= 0) { - logger.debug("fpMonitorCycle(): disabled"); - elapsedTime = 0; - return; // monitoring is disabled - } - - elapsedTime = elapsedTime + cycleIntervalMillis; - if (elapsedTime < monitorIntervalMs) { - return; // monitoring interval not reached - } - - elapsedTime = 0; // reset elapsed time - - try { - if (fpCounter == lastFpCounter) { - // no forward progress - missedCycles += 1; - if (missedCycles >= failedCounterThreshold && !alarmExists) { - logger.debug("Forward progress not detected for resource {}. Setting state to disable failed.", - resourceName); - if (!(stateManager.getOpState()).equals(StateManagement.DISABLED)) { - // Note: The refreshStateAudit will make redundant - // calls - stateManager.disableFailed(); - } // The refreshStateAudit will catch the case where - // opStat = disabled and availState ! - // failed/dependency.failed - alarmExists = true; - } - } else { - // forward progress has occurred - lastFpCounter = fpCounter; - missedCycles = 0; - // set op state to enabled - logger.debug("Forward progress detected for resource {}. Setting state to enable not failed.", - resourceName); - if (!(stateManager.getOpState()).equals(StateManagement.ENABLED)) { - // Note: The refreshStateAudit will make redundant calls - stateManager.enableNotFailed(); - } // The refreshStateAudit will catch the case where - // opState=enabled and availStatus != null - alarmExists = false; - } - } catch (Exception e) { - // log error - logger.error("FP Monitor encountered error. ", e); - } - } - logger.debug("fpMonitorCycle(): exit"); - } - - /** - * Look for "Forward Progress" on other nodes. If they are not making - * forward progress, check their operational state. If it is not disabled, - * then disable them. - */ - private void stateAudit() { - logger.debug("IntegrityMonitor.stateAudit(): entry"); - if (stateAuditIntervalMs <= 0) { - logger.debug("IntegrityMonitor.stateAudit(): disabled"); - return; // stateAudit is disabled - } - - // Only run from nodes that are operational - if (stateManager.getOpState().equals(StateManagement.DISABLED)) { - logger.debug("IntegrityMonitor.stateAudit(): DISABLED. returning"); - return; - } - if (stateManager.getAdminState().equals(StateManagement.LOCKED)) { - logger.debug("IntegrityMonitor.stateAudit(): LOCKED. returning"); - return; - } - if (!stateManager.getStandbyStatus().equals(StateManagement.NULL_VALUE) - && stateManager.getStandbyStatus() != null) { - if (!stateManager.getStandbyStatus().equals(StateManagement.PROVIDING_SERVICE)) { - logger.debug("IntegrityMonitor.stateAudit(): NOT PROVIDING_SERVICE. returning"); - return; - } - } - - Date date = new Date(); - long timeSinceLastStateAudit = date.getTime() - lastStateAuditTime.getTime(); - if (timeSinceLastStateAudit < stateAuditIntervalMs) { - logger.debug("IntegrityMonitor.stateAudit(): Not time to run. returning"); - return; - } - - executeStateAudit(); - - lastStateAuditTime = date; - - logger.debug("IntegrityMonitor.stateAudit(): exit"); - }// end stateAudit() - - public void executeStateAudit() { - logger.debug("IntegrityMonitor.executeStateAudit(): entry"); - Date date = new Date(); - - // Get all entries in the forwardprogressentity table - List<ForwardProgressEntity> fpList = getAllForwardProgressEntity(); - - // Check if each forwardprogressentity entry is current - for (ForwardProgressEntity fpe : fpList) { - // If the this is my ForwardProgressEntity, continue - if (fpe.getResourceName().equals(IntegrityMonitor.resourceName)) { - continue; - } - // Make sure you are not getting a cached version - em.refresh(fpe); - long diffMs = date.getTime() - fpe.getLastUpdated().getTime(); - if (logger.isDebugEnabled()) { - logger.debug("IntegrityMonitor.executeStateAudit(): resource = {}, diffMs = {}", fpe.getResourceName(), - diffMs); - } - - // Threshold for a stale entry - long staleMs = maxFpcUpdateIntervalMs; - if (logger.isDebugEnabled()) { - logger.debug("IntegrityMonitor.executeStateAudit(): resource = {}, staleMs = {}", fpe.getResourceName(), - staleMs); - } - - if (diffMs > staleMs) { - // ForwardProgress is stale. Disable it - // Start a transaction - logger.debug("IntegrityMonitor.executeStateAudit(): resource = {}, FPC is stale. Disabling it"); - EntityTransaction et = em.getTransaction(); - et.begin(); - StateManagementEntity sme = null; - try { - // query if StateManagement entry exists for fpe resource - Query query = em - .createQuery("Select p from StateManagementEntity p where p.resourceName=:resource"); - query.setParameter("resource", fpe.getResourceName()); - - @SuppressWarnings("rawtypes") - List smList = query.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT) - .getResultList(); - if (!smList.isEmpty()) { - // exists - sme = (StateManagementEntity) smList.get(0); - // refresh the object from DB in case cached data was - // returned - em.refresh(sme); - if (logger.isDebugEnabled()) { - logger.debug( - "IntegrityMonitor.executeStateAudit(): Found entry in StateManagementEntity table for Resource={}", - sme.getResourceName()); - } - } else { - String msg = "IntegrityMonitor.executeStateAudit(): " + fpe.getResourceName() - + ": resource not found in state management entity database table"; - logger.error("{}", msg); - } - synchronized (imFlushLock) { - et.commit(); - } - } catch (Exception e) { - // log an error - logger.error( - "IntegrityMonitor.executeStateAudit(): {}: StateManagementEntity DB read failed with exception: ", - fpe.getResourceName(), e); - synchronized (imFlushLock) { - if (et.isActive()) { - et.rollback(); - } - } - } - - if (sme != null && !sme.getOpState().equals(StateManagement.DISABLED)) { - if (logger.isDebugEnabled()) { - logger.debug("IntegrityMonitor.executeStateAudit(): Changing OpStat = disabled for {}", - sme.getResourceName()); - } - try { - stateManager.disableFailed(sme.getResourceName()); - } catch (Exception e) { - String msg = "IntegrityMonitor.executeStateAudit(): Failed to disable " + sme.getResourceName(); - logger.error("{}", msg, e); - } - } - } // end if(diffMs > staleMs) - } // end for(ForwardProgressEntity fpe : fpList) - logger.debug("IntegrityMonitor.executeStateAudit(): exit"); - } - - /** - * Execute a test transaction when test transaction interval has elapsed. - */ - private void checkTestTransaction() { - logger.debug("checkTestTransaction(): entry"); - synchronized (checkTestTransactionLock) { - - // test transaction timer checks - if (testTransIntervalMs <= 0) { - logger.debug("checkTestTransaction(): disabled"); - elapsedTestTransTime = 0; - return; // test transaction is disabled - } - - elapsedTestTransTime = elapsedTestTransTime + cycleIntervalMillis; - if (elapsedTestTransTime < testTransIntervalMs) { - return; // test transaction interval not reached - } - - elapsedTestTransTime = 0; // reset elapsed time - - // execute test transaction - testTransaction(); - } - logger.debug("checkTestTransaction(): exit"); - } - - /** - * Updates Fpc counter in database when write Fpc interval has elapsed. - */ - private void checkWriteFpc() { - logger.debug("checkWriteFpc(): entry"); - synchronized (checkWriteFpcLock) { - - // test transaction timer checks - if (writeFpcIntervalMs <= 0) { - logger.debug("checkWriteFpc(): disabled"); - elapsedWriteFpcTime = 0; - return; // write Fpc is disabled - } - - elapsedWriteFpcTime = elapsedWriteFpcTime + cycleIntervalMillis; - if (elapsedWriteFpcTime < writeFpcIntervalMs) { - return; // write Fpc interval not reached - } - - elapsedWriteFpcTime = 0; // reset elapsed time - - // write Fpc to database - try { - writeFpc(); - } catch (Exception e) { - logger.error("IntegrityMonitor threw exception.", e); - } - } - logger.debug("checkWriteFpc(): exit"); - } - - /** - * Execute a dependency health check periodically which also updates this - * resource's state. - */ - private void checkDependentHealth() { - logger.debug("checkDependentHealth: entry"); - if (checkDependencyIntervalMs <= 0) { - logger.debug("checkDependentHealth: disabled"); - return; // dependency monitoring is disabled - } - - long currTime = System.currentTimeMillis(); - logger.debug("checkDependentHealth currTime - lastDependencyCheckTime = {}", - currTime - lastDependencyCheckTime); - if ((currTime - lastDependencyCheckTime) > checkDependencyIntervalMs) { - // execute dependency check and update this resource's state - - dependencyCheck(); - } - logger.debug("checkDependentHealth: exit"); - } - - /* - * This is a simple refresh audit which is periodically run to assure that - * the states and status attributes are aligned and notifications are sent - * to any listeners. It is possible for state/status to get out of synch and - * notified systems to be out of synch due to database corruption (manual or - * otherwise) or because a node became isolated. - * - * When the operation (lock/unlock) is called, it will cause a re-evaluation - * of the state and send a notification to all registered observers. - */ - private void refreshStateAudit() { - logger.debug("refreshStateAudit(): entry"); - if (refreshStateAuditIntervalMs <= 0) { - // The audit is disabled - logger.debug("refreshStateAudit(): disabled"); - return; - } - executeRefreshStateAudit(); - logger.debug("refreshStateAudit(): exit"); - } - - public void executeRefreshStateAudit() { - logger.debug("executeRefreshStateAudit(): entry"); - synchronized (refreshStateAuditLock) { - logger.debug("refreshStateAudit: entry"); - Date now = new Date(); - long nowMs = now.getTime(); - long lastTimeMs = refreshStateAuditLastRunDate.getTime(); - logger.debug("refreshStateAudit: ms since last run = {}", nowMs - lastTimeMs); - - if ((nowMs - lastTimeMs) > refreshStateAuditIntervalMs) { - String adminState = stateManager.getAdminState(); - logger.debug("refreshStateAudit: adminState = {}", adminState); - if (adminState.equals(StateManagement.LOCKED)) { - try { - logger.debug("refreshStateAudit: calling lock()"); - stateManager.lock(); - } catch (Exception e) { - logger.error("refreshStateAudit: caught unexpected exception from stateManager.lock(): ", e); - } - } else {// unlocked - try { - logger.debug("refreshStateAudit: calling unlock()"); - stateManager.unlock(); - } catch (Exception e) { - logger.error("refreshStateAudit: caught unexpected exception from stateManager.unlock(): ", e); - } - } - refreshStateAuditLastRunDate = new Date(); - logger.debug("refreshStateAudit: exit"); - } - } - logger.debug("executeRefreshStateAudit(): exit"); - } - - /** - * The following nested class periodically performs the forward progress - * check, checks dependencies, does a refresh state audit and runs the - * stateAudit. - */ - class FPManager extends Thread { - private final CountDownLatch stopper = new CountDownLatch(1); - - private BlockingQueue<CountDownLatch> queue; - private CountDownLatch progressLatch = null; - - // Constructor - start FP manager thread - FPManager(BlockingQueue<CountDownLatch> queue) { - this.queue = queue; - // set now as the last time the refreshStateAudit ran - IntegrityMonitor.this.refreshStateAuditLastRunDate = new Date(); - // start thread - this.start(); - } - - @Override - public void run() { - logger.debug("FPManager thread running"); - - try { - getLatch(); - decrementLatch(); - - while (!stopper.await(cycleIntervalMillis, TimeUnit.MILLISECONDS)) { - getLatch(); - IntegrityMonitor.this.runOnce(); - decrementLatch(); - } - - } catch (InterruptedException e) { - logger.debug("IntegrityMonitor threw exception.", e); - Thread.currentThread().interrupt(); - } - } - - public void stopAndExit() { - stopper.countDown(); - this.interrupt(); - } - - /** - * Gets the next latch from the queue. - * - * @throws InterruptedException - * - */ - private void getLatch() throws InterruptedException { - if (queue != null) { - progressLatch = queue.take(); - } - } - - /** - * Decrements the current latch. - */ - private void decrementLatch() { - if (progressLatch != null) { - progressLatch.countDown(); - } - } - - } - - private void runOnce() { - try { - logger.debug("FPManager calling fpMonitorCycle()"); - // check forward progress timer - fpMonitorCycle(); - - logger.debug("FPManager calling checkTestTransaction()"); - // check test transaction timer - checkTestTransaction(); - - logger.debug("FPManager calling checkWriteFpc()"); - // check write Fpc timer - checkWriteFpc(); - - logger.debug("FPManager calling checkDependentHealth()"); - // check dependency health - checkDependentHealth(); - - logger.debug("FPManager calling refreshStateAudit()"); - // check if it is time to run the refreshStateAudit - refreshStateAudit(); - - logger.debug("FPManager calling stateAudit()"); - // check if it is time to run the stateAudit - stateAudit(); - - } catch (Exception e) { - logger.error("Ignore FPManager thread processing timer(s) exception: ", e); - } - } - - public void allSeemsWell(@NotNull String key, @NotNull Boolean asw, @NotNull String msg) - throws AllSeemsWellException { - - logger.debug("allSeemsWell entry: key = {}, asw = {}, msg = {}", key, asw, msg); - if (key == null || key.isEmpty()) { - logger.error("allSeemsWell: 'key' has no visible content"); - throw new IllegalArgumentException("allSeemsWell: 'key' has no visible content"); - } - if (asw == null) { - logger.error("allSeemsWell: 'asw' is null"); - throw new IllegalArgumentException("allSeemsWell: 'asw' is null"); - } - if (msg == null || msg.isEmpty()) { - logger.error("allSeemsWell: 'msg' has no visible content"); - throw new IllegalArgumentException("allSeemsWell: 'msg' has no visible content"); - } - - if (allSeemsWellMap == null) { - allSeemsWellMap = new HashMap<>(); - } - - if (allNotWellMap == null) { - allNotWellMap = new HashMap<>(); - } - - if (asw) { - logger.info("allSeemsWell: ALL SEEMS WELL: key = {}, msg = {}", key, msg); - try { - allSeemsWellMap.put(key, msg); - } catch (Exception e) { - String exceptMsg = "allSeemsWell: encountered an exception with allSeemsWellMap.put(" + key + "," + msg - + ")"; - logger.error(exceptMsg); - throw new AllSeemsWellException(exceptMsg, e); - } - - try { - allNotWellMap.remove(key); - } catch (Exception e) { - String exceptMsg = "allSeemsWell: encountered an exception with allNotWellMap.delete(" + key + ")"; - logger.error(exceptMsg); - throw new AllSeemsWellException(exceptMsg, e); - } - - } else { - logger.error("allSeemsWell: ALL NOT WELL: key = {}, msg = {}", key, msg); - try { - allSeemsWellMap.remove(key); - } catch (Exception e) { - String exceptMsg = "allSeemsWell: encountered an exception with allSeemsWellMap.remove(" + key + ")"; - logger.error(exceptMsg); - throw new AllSeemsWellException(exceptMsg, e); - } - - try { - allNotWellMap.put(key, msg); - } catch (Exception e) { - String exceptMsg = "allSeemsWell: encountered an exception with allNotWellMap.put(" + key + msg + ")"; - logger.error(exceptMsg); - throw new AllSeemsWellException(exceptMsg, e); - } - } - - if (logger.isDebugEnabled()) { - for (Entry<String, String> entry : allSeemsWellMap.entrySet()) { - logger.debug("allSeemsWellMap: key = {} msg = {}", entry.getKey(), entry.getValue()); - } - for (Entry<String, String> entry : allNotWellMap.entrySet()) { - logger.debug("allNotWellMap: key = {} msg = {}", entry.getKey(), entry.getValue()); - } - logger.debug("allSeemsWell exit"); - } - } - - /** - * Converts the given value to milliseconds using the current - * {@link #propertyUnits}. - * - * @param value - * value to be converted, or -1 - * @return the value, in milliseconds, or -1 - */ - private static long toMillis(long value) { - return (value < 0 ? -1 : propertyUnits.toMillis(value)); - } - - public Map<String, String> getAllSeemsWellMap() { - return allSeemsWellMap; - } - - public Map<String, String> getAllNotWellMap() { - return allNotWellMap; - } - - /* - * The remaining methods are used by JUnit tests. - */ - - public static boolean isUnitTesting() { - return isUnitTesting; - } - - public static void setUnitTesting(boolean isUnitTesting) { - IntegrityMonitor.isUnitTesting = isUnitTesting; - } - - protected static TimeUnit getPropertyUnits() { - return propertyUnits; - } - - protected static void setPropertyUnits(TimeUnit propertyUnits) { - IntegrityMonitor.propertyUnits = propertyUnits; - } - - protected static long getCycleIntervalMillis() { - return cycleIntervalMillis; - } - - protected static void setCycleIntervalMillis(long cycleIntervalMillis) { - IntegrityMonitor.cycleIntervalMillis = cycleIntervalMillis; - } - - protected static String getPersistenceUnit() { - return persistenceUnit; - } - - protected static void setPersistenceUnit(String persistenceUnit) { - IntegrityMonitor.persistenceUnit = persistenceUnit; - } + // Persistence Unit for JPA + public static final String PERSISTENCE_UNIT = "operationalPU"; + + private static String persistenceUnit = PERSISTENCE_UNIT; + + private static final long CYCLE_INTERVAL_MILLIS = 1000L; + + private static long cycleIntervalMillis = CYCLE_INTERVAL_MILLIS; + + /** + * Units used for intervals extracted from the properties, which are typically given in seconds. + */ + private static TimeUnit propertyUnits = TimeUnit.SECONDS; + + private StateManagement stateManager = null; + + private FpManager fpManager = null; + + // The forward progress counter is incremented as the + // process being monitored makes forward progress + private int fpCounter = 0; + private int lastFpCounter = 0; + + // elapsed time since last FP counter check + private long elapsedTime = 0; + + // elapsed time since last test transaction check + private long elapsedTestTransTime = 0; + + // elapsed time since last write Fpc check + private long elapsedWriteFpcTime = 0; + + // last dependency health check time. Initialize so that the periodic check + // starts after 60 seconds. + // This allows time for dependents to come up. + private long lastDependencyCheckTime = System.currentTimeMillis(); + + // Time of the last state audit. It is initialized at the time of the IM + // construction + private Date lastStateAuditTime = new Date(); + + // Interval between state audits in ms. We leave it turned off by default so + // that it will only + // be run on the nodes which we want doing the audit. In particular, we only + // want it to run + // on the droolspdps + private static long stateAuditIntervalMs = 0L; + + // the number of cycles since 'fpCounter' was last changed + private int missedCycles = 0; + + // forward progress monitoring interval + private static long monitorIntervalMs = 1000L * IntegrityMonitorProperties.DEFAULT_MONITOR_INTERVAL; + // The number of periods the counter fails to increment before an alarm is + // raised. + private static int failedCounterThreshold = IntegrityMonitorProperties.DEFAULT_FAILED_COUNTER_THRESHOLD; + // test transaction interval + private static long testTransIntervalMs = 1000L * IntegrityMonitorProperties.DEFAULT_TEST_INTERVAL; + // write Fpc to DB interval + private static long writeFpcIntervalMs = 1000L * IntegrityMonitorProperties.DEFAULT_WRITE_FPC_INTERVAL; + // check the health of dependencies + private static long checkDependencyIntervalMs = + 1000L * IntegrityMonitorProperties.DEFAULT_CHECK_DEPENDENCY_INTERVAL; + + // A lead subsystem will have dependency groups with resource names in the + // properties file. + // For non-lead subsystems, the dependency_group property will be absent. + private static String[] depGroups = null; + + private static boolean isUnitTesting = false; + + // can turn on health checking of dependents via jmx test() call by setting + // this property to true + private static boolean testViaJmx = false; + + private static String jmxFqdn = null; + + // this is the max interval allowed without any forward progress + // counter updates + private static long maxFpcUpdateIntervalMs = 1000L * IntegrityMonitorProperties.DEFAULT_MAX_FPC_UPDATE_INTERVAL; + + // Node types + private enum NodeType { + PDP_XACML, PDP_DROOLS, PAP, PAP_ADMIN, LOGPARSER, BRMS_GATEWAY, ASTRA_GATEWAY, ELK_SERVER, PYPDP + + } + + private static String siteName; + private static String nodeType; + private Date refreshStateAuditLastRunDate; + private static long refreshStateAuditIntervalMs = 600000; // run it once per 10 minutes + + // lock objects + private final Object evaluateSanityLock = new Object(); + private final Object fpMonitorCycleLock = new Object(); + private final Object dependencyCheckLock = new Object(); + private final Object testTransactionLock = new Object(); + private final Object startTransactionLock = new Object(); + private final Object endTransactionLock = new Object(); + private final Object checkTestTransactionLock = new Object(); + private final Object checkWriteFpcLock = new Object(); + private static final Object getInstanceLock = new Object(); + private final Object refreshStateAuditLock = new Object(); + private final Object imFlushLock = new Object(); + + private Map<String, String> allSeemsWellMap; + private Map<String, String> allNotWellMap; + + /** + * IntegrityMonitor constructor. It is invoked from the getInstance() method in this class or + * from the constructor of a child or sub-class. A class can extend the IntegrityMonitor class + * if there is a need to override any of the base methods (ex. subsystemTest()). Only one + * instance is allowed to be created per resource name. + * + * @param resourceName The resource name of the resource + * @param properties a set of properties passed in from the resource + * @throws IntegrityMonitorException if any errors are encountered in the constructor + */ + protected IntegrityMonitor(String resourceName, Properties properties) throws IntegrityMonitorException { + + this(resourceName, properties, null); + } + + /** + * IntegrityMonitor constructor. It is invoked from the getInstance() method in this class or + * from the constructor of a child or sub-class. A class can extend the IntegrityMonitor class + * if there is a need to override any of the base methods (ex. subsystemTest()). Only one + * instance is allowed to be created per resource name. + * + * @param resourceName The resource name of the resource + * @param properties a set of properties passed in from the resource + * @param queue queue to use to control the FPManager thread, or {@code null} + * @throws IntegrityMonitorException if any errors are encountered in the constructor + */ + protected IntegrityMonitor(String resourceName, Properties properties, BlockingQueue<CountDownLatch> queue) + throws IntegrityMonitorException { + + // singleton check since this constructor can be called from a child or + // sub-class + if (instance != null) { + String msg = "IM object exists and only one instance allowed"; + logger.error("{}", msg); + throw new IntegrityMonitorException("IntegrityMonitor constructor exception: " + msg); + } + instance = this; + + IntegrityMonitor.resourceName = resourceName; + + /* + * Validate that the properties file contains all the needed properties. Throws an + * IntegrityMonitorPropertiesException + */ + validateProperties(properties); + + // construct jmx url + String jmxUrl = getJmxUrl(); + + // + // Create the entity manager factory + // + emf = Persistence.createEntityManagerFactory(persistenceUnit, properties); + // + // Did it get created? + // + if (emf == null) { + logger.error("Error creating IM entity manager factory with persistence unit: {}", persistenceUnit); + throw new IntegrityMonitorException("Unable to create IM Entity Manager Factory"); + } + + // add entry to forward progress and resource registration tables in DB + + // Start a transaction + em = emf.createEntityManager(); + EntityTransaction et = em.getTransaction(); + + et.begin(); + + try { + // if ForwardProgress entry exists for resourceName, update it. If + // not found, create a new entry + Query fquery = em.createQuery("Select f from ForwardProgressEntity f where f.resourceName=:rn"); + fquery.setParameter("rn", resourceName); + + @SuppressWarnings("rawtypes") + List fpList = fquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + ForwardProgressEntity fpx = null; + if (!fpList.isEmpty()) { + // ignores multiple results + fpx = (ForwardProgressEntity) fpList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(fpx); + if (logger.isDebugEnabled()) { + logger.debug("Resource {} exists and will be updated - old fpc= {}, lastUpdated= {}", resourceName, + fpx.getFpcCount(), fpx.getLastUpdated()); + } + fpx.setFpcCount(fpCounter); + } else { + // Create a forward progress object + logger.debug("Adding resource {} to ForwardProgress table", resourceName); + fpx = new ForwardProgressEntity(); + } + // update/set columns in entry + fpx.setResourceName(resourceName); + em.persist(fpx); + // flush to the DB + synchronized (imFlushLock) { + em.flush(); + } + + // if ResourceRegistration entry exists for resourceName, update it. + // If not found, create a new entry + Query rquery = em.createQuery("Select r from ResourceRegistrationEntity r where r.resourceName=:rn"); + rquery.setParameter("rn", resourceName); + + @SuppressWarnings("rawtypes") + List rrList = rquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + ResourceRegistrationEntity rrx = null; + if (!rrList.isEmpty()) { + // ignores multiple results + rrx = (ResourceRegistrationEntity) rrList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(rrx); + if (logger.isDebugEnabled()) { + logger.debug("Resource {} exists and will be updated - old url= {}, createdDate={}", resourceName, + rrx.getResourceUrl(), rrx.getCreatedDate()); + } + rrx.setLastUpdated(new Date()); + } else { + // register resource by adding entry to table in DB + logger.debug("Adding resource {} to ResourceRegistration table", resourceName); + rrx = new ResourceRegistrationEntity(); + } + // update/set columns in entry + rrx.setResourceName(resourceName); + rrx.setResourceUrl(jmxUrl); + rrx.setNodeType(nodeType); + rrx.setSite(siteName); + em.persist(rrx); + // flush to the DB + synchronized (imFlushLock) { + et.commit(); + } + + } catch (Exception e) { + logger.error("IntegrityMonitor constructor DB table update failed with exception: ", e); + try { + if (et.isActive()) { + synchronized (imFlushLock) { + et.rollback(); + } + } + } catch (Exception e1) { + logger.error("IntegrityMonitor constructor threw exception: ", e1); + } + throw e; + } + + try { + // create instance of StateMangement class and pass emf to it + stateManager = new StateManagement(emf, resourceName); + + /** + * Initialize the state and status attributes. This will maintain any Administrative + * state value but will set the operational state = enabled, availability status = null, + * standby status = null. The integrity monitor will set the operational state via the + * FPManager and the owning application must set the standby status by calling + * promote/demote on the StateManager. + */ + stateManager.initializeState(); + + } catch (StateManagementException e) { + throw new IntegrityMonitorException(e); + } + + // create management bean + try { + new ComponentAdmin(resourceName, this, stateManager); + } catch (Exception e) { + logger.error("ComponentAdmin constructor exception: {}", e.toString(), e); + } + + fpManager = new FpManager(queue); + + } + + /** + * Get an instance of IntegrityMonitor for a given resource name. It creates one if it does not + * exist. Only one instance is allowed to be created per resource name. + * + * @param resourceName The resource name of the resource + * @param properties a set of properties passed in from the resource + * @return The new instance of IntegrityMonitor + * @throws IntegrityMonitorException if unable to create jmx url or the constructor returns an + * exception + */ + public static IntegrityMonitor getInstance(String resourceName, Properties properties) + throws IntegrityMonitorException { + return getInstance(resourceName, properties, null); + } + + /** + * Get an instance of IntegrityMonitor for a given resource name. It creates one if it does not + * exist. Only one instance is allowed to be created per resource name. + * + * @param resourceName The resource name of the resource + * @param properties a set of properties passed in from the resource + * @param queue queue to use to control the FPManager thread, or {@code null} + * @return The new instance of IntegrityMonitor + * @throws IntegrityMonitorException if unable to create jmx url or the constructor returns an + * exception + */ + protected static IntegrityMonitor getInstance(String resourceName, Properties properties, + BlockingQueue<CountDownLatch> queue) throws IntegrityMonitorException { + + synchronized (getInstanceLock) { + logger.debug("getInstance() called - resourceName= {}", resourceName); + if (resourceName == null || resourceName.isEmpty() || properties == null) { + logger.error("Error: getIntegrityMonitorInstance() called with invalid input"); + return null; + } + + if (instance == null) { + logger.debug("Creating new instance of IntegrityMonitor"); + instance = new IntegrityMonitor(resourceName, properties, queue); + } + return instance; + } + } + + /** + * Get the single instance. + * + * @return the instance + * @throws IntegrityMonitorException if no instance exists + */ + public static IntegrityMonitor getInstance() throws IntegrityMonitorException { + logger.debug("getInstance() called"); + if (instance == null) { + String msg = "No IntegrityMonitor instance exists." + + " Please use the method IntegrityMonitor.getInstance(String resourceName, Properties properties)"; + throw new IntegrityMonitorPropertiesException(msg); + } else { + return instance; + } + } + + /** + * This is a facility used by JUnit testing to destroy the IntegrityMonitor instance before + * creating a new one. It waits a bit to allow the FPManager to fully exit. + */ + public static void deleteInstance() throws IntegrityMonitorException { + logger.debug("deleteInstance() called"); + synchronized (getInstanceLock) { + if (isUnitTesting() && instance != null && instance.getFpManager() != null) { + FpManager fpm = instance.getFpManager(); + + // Stop the FPManager thread + fpm.stopAndExit(); + + try { + // Make sure it has exited + fpm.join(2000L); + } catch (InterruptedException e) { + logger.error("deleteInstance: Interrupted while waiting for FPManaager to fully exit", e); + Thread.currentThread().interrupt(); + } + + if (fpm.isAlive()) { + logger.error("IntegrityMonitor.deleteInstance() Failed to kill FPManager thread"); + throw new IntegrityMonitorException( + "IntegrityMonitor.deleteInstance() Failed to kill FPManager thread"); + } + + instance = null; + } + } + logger.debug("deleteInstance() exit"); + } + + private FpManager getFpManager() { + return fpManager; + } + + private static String getJmxUrl() throws IntegrityMonitorException { + + // get the jmx remote port and construct the JMX URL + Properties systemProps = System.getProperties(); + String jmxPort = systemProps.getProperty("com.sun.management.jmxremote.port"); + String jmxErrMsg; + if (jmxPort == null) { + jmxErrMsg = "System property com.sun.management.jmxremote.port for JMX remote port is not set"; + logger.error("{}", jmxErrMsg); + throw new IntegrityMonitorException("getJmxUrl exception: " + jmxErrMsg); + } + + int port = 0; + try { + port = Integer.parseInt(jmxPort); + } catch (NumberFormatException e) { + jmxErrMsg = "JMX remote port is not a valid integer value - " + jmxPort; + logger.error("{}", jmxErrMsg); + throw new IntegrityMonitorException("getJmxUrl exception: " + jmxErrMsg); + } + + try { + if (jmxFqdn == null) { + // get FQDN of this host + jmxFqdn = InetAddress.getLocalHost().getCanonicalHostName(); + } + } catch (Exception e) { + String msg = "getJmxUrl could not get hostname"; + logger.error("{}", msg, e); + throw new IntegrityMonitorException("getJmxUrl Exception: " + msg); + } + if (jmxFqdn == null) { + String msg = "getJmxUrl encountered null hostname"; + logger.error("{}", msg); + throw new IntegrityMonitorException("getJmxUrl error: " + msg); + } + + // assemble the jmx url + String jmxUrl = "service:jmx:rmi:///jndi/rmi://" + jmxFqdn + ":" + port + "/jmxrmi"; + + logger.debug("IntegerityMonitor - jmx url={}", jmxUrl); + + return jmxUrl; + } + + /** + * evaluateSanity() is designed to be called by an external entity to evealuate the sanity of + * the node. It checks the operational and administrative states and the standby status. If the + * operational state is disabled, it will include the dependencyCheckErrorMsg which includes + * information about any dependency (node) which has failed. + */ + public void evaluateSanity() throws IntegrityMonitorException { + logger.debug("evaluateSanity called ...."); + synchronized (evaluateSanityLock) { + + String errorMsg = dependencyCheckErrorMsg; + logger.debug("evaluateSanity dependencyCheckErrorMsg = {}", errorMsg); + // check op state and throw exception if disabled + if ((stateManager.getOpState() != null) && stateManager.getOpState().equals(StateManagement.DISABLED)) { + String msg = "Resource " + resourceName + " operation state is disabled. " + errorMsg; + logger.debug("{}", msg); + throw new IntegrityMonitorException(msg); + } + + // check admin state and throw exception if locked + if ((stateManager.getAdminState() != null) && stateManager.getAdminState().equals(StateManagement.LOCKED)) { + String msg = "Resource " + resourceName + " is administratively locked"; + logger.debug("{}", msg); + throw new AdministrativeStateException("IntegrityMonitor Admin State Exception: " + msg); + } + // check standby state and throw exception if cold standby + if ((stateManager.getStandbyStatus() != null) + && stateManager.getStandbyStatus().equals(StateManagement.COLD_STANDBY)) { + String msg = "Resource " + resourceName + " is cold standby"; + logger.debug("{}", msg); + throw new StandbyStatusException("IntegrityMonitor Standby Status Exception: " + msg); + } + + } + + } + + /** + * This method checks the forward progress counter and the state of a dependency. If the + * dependency is unavailable or failed, an error message is created which is checked when + * evaluateSanity interface is called. If the error message is set then the evaluateSanity will + * return an error. + * + * @param dep the dependency + */ + public String stateCheck(String dep) { + logger.debug("checking state of dependent resource: {}", dep); + String errorMsg = null; + ForwardProgressEntity forwardProgressEntity = null; + StateManagementEntity stateManagementEntity = null; + + // Start a transaction + EntityTransaction et = em.getTransaction(); + et.begin(); + + try { + Query query = em.createQuery("Select p from ForwardProgressEntity p where p.resourceName=:resource"); + query.setParameter("resource", dep); + + @SuppressWarnings("rawtypes") + List fpList = query.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + + if (!fpList.isEmpty()) { + // exists + forwardProgressEntity = (ForwardProgressEntity) fpList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(forwardProgressEntity); + logger.debug("Found entry in ForwardProgressEntity table for dependent Resource={}", dep); + } else { + errorMsg = dep + ": resource not found in ForwardProgressEntity database table"; + logger.error("{}", errorMsg); + } + synchronized (imFlushLock) { + et.commit(); + } + } catch (Exception ex) { + // log an error + errorMsg = dep + ": ForwardProgressEntity DB operation failed with exception: "; + logger.error("{}", errorMsg, ex); + synchronized (imFlushLock) { + if (et.isActive()) { + et.rollback(); + } + } + } + + if (errorMsg == null) { + // Start a transaction + et = em.getTransaction(); + et.begin(); + try { + // query if StateManagement entry exists for dependent resource + Query query = em.createQuery("Select p from StateManagementEntity p where p.resourceName=:resource"); + query.setParameter("resource", dep); + + @SuppressWarnings("rawtypes") + List smList = query.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + if (!smList.isEmpty()) { + // exist + stateManagementEntity = (StateManagementEntity) smList.get(0); + // refresh the object from DB in case cached data was + // returned + em.refresh(stateManagementEntity); + logger.debug("Found entry in StateManagementEntity table for dependent Resource={}", dep); + } else { + errorMsg = dep + ": resource not found in state management entity database table"; + logger.error("{}", errorMsg); + } + + synchronized (imFlushLock) { + et.commit(); + } + } catch (Exception e) { + // log an error + errorMsg = dep + ": StateManagementEntity DB read failed with exception: "; + logger.error("{}", errorMsg, e); + synchronized (imFlushLock) { + if (et.isActive()) { + et.rollback(); + } + } + } + } + + // verify that the ForwardProgress is current (check last_updated) + if (errorMsg == null) { + if (forwardProgressEntity != null && stateManagementEntity != null) { + Date date = new Date(); + long diffMs = date.getTime() - forwardProgressEntity.getLastUpdated().getTime(); + logger.debug("IntegrityMonitor.stateCheck(): diffMs = {}", diffMs); + + // Threshold for a stale entry + long staleMs = maxFpcUpdateIntervalMs; + logger.debug("IntegrityMonitor.stateCheck(): staleMs = {}", staleMs); + + if (diffMs > staleMs) { + // ForwardProgress is stale. Disable it + try { + if (!stateManagementEntity.getOpState().equals(StateManagement.DISABLED)) { + logger.debug("IntegrityMonitor.stateCheck(): Changing OpStat = disabled for {}", dep); + stateManager.disableFailed(dep); + } + } catch (Exception e) { + String msg = "IntegrityMonitor.stateCheck(): Failed to diableFail dependent resource = " + dep + + "; " + e.getMessage(); + logger.error("{}", msg, e); + } + } + } else { + + if (forwardProgressEntity == null) { + String msg = "IntegrityMonitor.stateCheck(): Failed to diableFail dependent resource = " + dep + + "; " + " forwardProgressEntity == null."; + logger.error("{}", msg); + } + + else { + String msg = "IntegrityMonitor.stateCheck(): Failed to diableFail dependent resource = " + dep + + "; " + " stateManagementEntity == null."; + logger.error("{}", msg); + } + } + } + + // check operation, admin and standby states of dependent resource + if (errorMsg == null) { + if (stateManagementEntity != null) { + if ((stateManager.getAdminState() != null) + && stateManagementEntity.getAdminState().equals(StateManagement.LOCKED)) { + errorMsg = dep + ": resource is administratively locked"; + logger.error("{}", errorMsg); + } else if ((stateManager.getOpState() != null) + && stateManagementEntity.getOpState().equals(StateManagement.DISABLED)) { + errorMsg = dep + ": resource is operationally disabled"; + logger.error("{}", errorMsg); + } else if ((stateManager.getStandbyStatus() != null) + && stateManagementEntity.getStandbyStatus().equals(StateManagement.COLD_STANDBY)) { + errorMsg = dep + ": resource is cold standby"; + logger.error("{}", errorMsg); + } + } else { + errorMsg = dep + ": could not check standy state of resource. stateManagementEntity == null."; + logger.error("{}", errorMsg); + } + } + + String returnMsg = "IntegrityMonitor.stateCheck(): returned error_msg: " + errorMsg; + logger.debug("{}", returnMsg); + return errorMsg; + } + + private String fpCheck(String dep) { + logger.debug("checking forward progress count of dependent resource: {}", dep); + + String errorMsg = null; + + // check FPC count - a changing FPC count indicates the resource JVM is + // running + + // Start a transaction + EntityTransaction et = em.getTransaction(); + et.begin(); + try { + Query fquery = em.createQuery("Select f from ForwardProgressEntity f where f.resourceName=:rn"); + fquery.setParameter("rn", dep); + + @SuppressWarnings("rawtypes") + List fpList = fquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + ForwardProgressEntity fpx; + if (!fpList.isEmpty()) { + // ignores multiple results + fpx = (ForwardProgressEntity) fpList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(fpx); + if (logger.isDebugEnabled()) { + logger.debug("Dependent resource {} - fpc= {}, lastUpdated={}", dep, fpx.getFpcCount(), + fpx.getLastUpdated()); + } + long currTime = System.currentTimeMillis(); + // if dependent resource FPC has not been updated, consider it + // an error + if ((currTime - fpx.getLastUpdated().getTime()) > maxFpcUpdateIntervalMs) { + errorMsg = dep + ": FP count has not been updated in the last " + maxFpcUpdateIntervalMs + "ms"; + logger.error("{}", errorMsg); + try { + // create instance of StateMangement class for dependent + StateManagement depStateManager = new StateManagement(emf, dep); + if (!depStateManager.getOpState().equals(StateManagement.DISABLED)) { + logger.debug("Forward progress not detected for dependent resource {}. Setting dependent's " + + "state to disable failed.", dep); + depStateManager.disableFailed(); + } + } catch (Exception e) { + // ignore errors + logger.error("Update dependent state failed with exception: ", e); + } + } + } else { + // resource entry not found in FPC table + errorMsg = dep + ": resource not found in ForwardProgressEntity table in the DB"; + logger.error("{}", errorMsg); + } + synchronized (imFlushLock) { + et.commit(); + } + } catch (Exception e) { + // log an error and continue + errorMsg = dep + ": ForwardProgressEntity DB read failed with exception: "; + logger.error("{}", errorMsg, e); + synchronized (imFlushLock) { + if (et.isActive()) { + et.rollback(); + } + } + } + + return errorMsg; + } + + /** + * Get all forward progress entities. + * + * @return list of all forward progress entities + */ + public List<ForwardProgressEntity> getAllForwardProgressEntity() { + logger.debug("getAllForwardProgressEntity: entry"); + ArrayList<ForwardProgressEntity> fpList = new ArrayList<>(); + // Start a transaction + EntityTransaction et = em.getTransaction(); + et.begin(); + try { + Query fquery = em.createQuery("Select e from ForwardProgressEntity e"); + @SuppressWarnings("rawtypes") + List myList = fquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + synchronized (imFlushLock) { + et.commit(); + } + logger.debug("getAllForwardProgressEntity: myList.size(): {}", myList.size()); + if (!myList.isEmpty()) { + for (int i = 0; i < myList.size(); i++) { + if (logger.isDebugEnabled()) { + logger.debug("getAllForwardProgressEntity: myList.get({}).getResourceName(): {}", i, + ((ForwardProgressEntity) myList.get(i)).getResourceName()); + } + fpList.add((ForwardProgressEntity) myList.get(i)); + } + } + synchronized (imFlushLock) { + if (et.isActive()) { + et.commit(); + } + } + } catch (Exception e) { + // log an error and continue + String msg = "getAllForwardProgessEntity DB read failed with exception: "; + logger.error("{}", msg, e); + synchronized (imFlushLock) { + if (et.isActive()) { + et.rollback(); + } + } + } + return fpList; + } + + private String jmxCheck(String dep) { + logger.debug("checking health of dependent by calling test() via JMX on resource: {}", dep); + + String errorMsg = null; + + // get the JMX URL from the database + String jmxUrl = null; + // Start a transaction + EntityTransaction et = em.getTransaction(); + et.begin(); + try { + // query if ResourceRegistration entry exists for resourceName + Query rquery = em.createQuery("Select r from ResourceRegistrationEntity r where r.resourceName=:rn"); + rquery.setParameter("rn", dep); + + @SuppressWarnings("rawtypes") + List rrList = rquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + ResourceRegistrationEntity rrx = null; + + if (!rrList.isEmpty()) { + // ignores multiple results + rrx = (ResourceRegistrationEntity) rrList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(rrx); + jmxUrl = rrx.getResourceUrl(); + if (logger.isDebugEnabled()) { + logger.debug("Dependent Resource={}, url={}, createdDate={}", dep, jmxUrl, rrx.getCreatedDate()); + } + } else { + errorMsg = dep + ": resource not found in ResourceRegistrationEntity table in the DB"; + logger.error("{}", errorMsg); + } + + synchronized (imFlushLock) { + et.commit(); + } + } catch (Exception e) { + errorMsg = dep + ": ResourceRegistrationEntity DB read failed with exception: "; + logger.error("{}", errorMsg, e); + synchronized (imFlushLock) { + if (et.isActive()) { + et.rollback(); + } + } + } + + if (jmxUrl != null) { + JmxAgentConnection jmxAgentConnection = null; + try { + jmxAgentConnection = new JmxAgentConnection(jmxUrl); + MBeanServerConnection mbeanServer = jmxAgentConnection.getMBeanConnection(); + ComponentAdminMBean admin = + JMX.newMXBeanProxy(mbeanServer, ComponentAdmin.getObjectName(dep), ComponentAdminMBean.class); + + // invoke the test method via the jmx proxy + admin.test(); + logger.debug("Dependent resource {} sanity test passed", dep); + } catch (Exception e) { + errorMsg = dep + ": resource sanity test failed with exception: "; + logger.error("{}", errorMsg, e); + } finally { + // close the JMX connector + if (jmxAgentConnection != null) { + jmxAgentConnection.disconnect(); + } + } + } + + return errorMsg; + } + + /** + * Perform a dependency check. + * + * @return an error message detailing any issues found + */ + public String dependencyCheck() { + logger.debug("dependencyCheck: entry"); + synchronized (dependencyCheckLock) { + + // Start with the error message empty + String errorMsg = ""; + boolean dependencyFailure = false; + + /* + * Before we check dependency groups we need to check subsystemTest. + */ + try { + // Test any subsystems that are not covered under the dependency + // relationship + subsystemTest(); + } catch (Exception e) { + logger.error("IntegrityMonitor threw exception", e); + dependencyFailure = true; + // This indicates a subsystemTest failure + try { + if (logger.isDebugEnabled()) { + logger.debug( + "{}: There has been a subsystemTest failure with error:{} Updating this resource's " + + "state to disableDependency", + resourceName, e.getMessage()); + } + // Capture the subsystemTest failure info + if (!errorMsg.isEmpty()) { + errorMsg = errorMsg.concat(","); + } + errorMsg = errorMsg.concat(resourceName + ": " + e.getMessage()); + this.stateManager.disableDependency(); + } catch (Exception ex) { + logger.error("IntegrityMonitor threw exception.", ex); + if (!errorMsg.isEmpty()) { + errorMsg = errorMsg.concat(","); + } + errorMsg = errorMsg.concat("\n" + resourceName + + ": Failed to disable dependency after subsystemTest failure due to: " + ex.getMessage()); + } + } + + // Check the sanity of dependents for lead subcomponents + if (depGroups != null && depGroups.length > 0) { + // check state of resources in dependency groups + for (String group : depGroups) { + group = group.trim(); + if (group.isEmpty()) { + // ignore empty group + continue; + } + String[] dependencies = group.split(","); + if (logger.isDebugEnabled()) { + logger.debug("group dependencies = {}", Arrays.toString(dependencies)); + } + int realDepCount = 0; + int failDepCount = 0; + for (String dep : dependencies) { + dep = dep.trim(); + if (dep.isEmpty()) { + // ignore empty dependency + continue; + } + realDepCount++; // this is a valid dependency whose state is tracked + // if a resource is down, its FP count will not be incremented + String failMsg = fpCheck(dep); + if (failMsg == null) { + if (testViaJmx) { + failMsg = jmxCheck(dep); + } else { + failMsg = stateCheck(dep); + } + } + if (failMsg != null) { + failDepCount++; + if (!errorMsg.isEmpty()) { + errorMsg = errorMsg.concat(", "); + } + errorMsg = errorMsg.concat(failMsg); + } + } // end for (String dep : dependencies) + + // if all dependencies in a group are failed, set this + // resource's state to disable dependency + if ((realDepCount > 0) && (failDepCount == realDepCount)) { + dependencyFailure = true; + try { + logger.debug("All dependents in group {} have failed their health check. Updating this " + + "resource's state to disableDependency", group); + if (stateManager.getAvailStatus() == null || !((stateManager.getAvailStatus()) + .equals(StateManagement.DEPENDENCY) + || (stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY_FAILED))) { + // Note: redundant calls are made by + // refreshStateAudit + this.stateManager.disableDependency(); + } + } catch (Exception e) { + logger.error("IntegrityMonitor threw exception.", e); + if (!errorMsg.isEmpty()) { + errorMsg = errorMsg.concat(","); + } + errorMsg = errorMsg.concat(resourceName + ": Failed to disable dependency"); + break; // break out on failure and skip checking other groups + } + } + // check the next group + + } // end for (String group : depGroups) + + /* + * We have checked all the dependency groups. If all are ok and subsystemTest + * passed, dependencyFailure == false + */ + if (!dependencyFailure) { + try { + logger.debug( + "All dependency groups have at least one viable member. Updating this resource's state" + + " to enableNoDependency"); + if (stateManager.getAvailStatus() != null + && ((stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY) + || (stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY_FAILED))) { + // Note: redundant calls are made by + // refreshStateAudit + this.stateManager.enableNoDependency(); + } + // The refreshStateAudit will catch the case where it is disabled but + // availStatus != failed + } catch (Exception e) { + logger.error("IntegrityMonitor threw exception.", e); + if (!errorMsg.isEmpty()) { + errorMsg = errorMsg.concat(","); + } + errorMsg = errorMsg.concat(resourceName + ": Failed to enable no dependency"); + } + } + } else if (!dependencyFailure) { + /* + * This is put here to clean up when no dependency group should exist, but one was + * erroneously added which caused the state to be disabled/dependency/coldstandby + * and later removed. We saw this happen in the lab, but is not very likely in a + * production environment...but you never know. + */ + try { + logger.debug("There are no dependents. Updating this resource's state to enableNoDependency"); + if (stateManager.getAvailStatus() != null + && ((stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY) + || (stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY_FAILED))) { + // Note: redundant calls are made by refreshStateAudit + this.stateManager.enableNoDependency(); + } + // The refreshStateAudit will catch the case where it is disabled but + // availStatus != failed + } catch (Exception e) { + logger.error("IntegrityMonitor threw exception.", e); + if (!errorMsg.isEmpty()) { + errorMsg = errorMsg.concat(","); + } + errorMsg = errorMsg.concat(resourceName + ": Failed to enable no dependency"); + } + } + + if (!errorMsg.isEmpty()) { + logger.error("Sanity failure detected in a dependent resource: {}", errorMsg); + + } + + dependencyCheckErrorMsg = errorMsg; + lastDependencyCheckTime = System.currentTimeMillis(); + logger.debug("dependencyCheck: exit"); + return errorMsg; + } + } + + /** + * Execute a test transaction. It is called when the test transaction timer fires. It could be + * overridden to provide additional test functionality. If overridden, the overriding method + * must invoke startTransaction() and endTransaction() and check if the allNotWellMap is empty. + */ + public void testTransaction() { + synchronized (testTransactionLock) { + logger.debug("testTransaction: entry"); + // + // startTransaction() not required for testTransaction + // + + // end transaction - increments local FP counter + endTransaction(); + } + } + + /** + * Additional testing for subsystems that do not have a /test interface (for ex. 3rd party + * processes like elk). This method would be overridden by the subsystem. + */ + public void subsystemTest() throws IntegrityMonitorException { + // Testing provided by subsystem + logger.debug("IntegrityMonitor subsystemTest() OK"); + } + + /** + * Checks admin state and resets transaction timer. Called by application at the start of a + * transaction. + * + * @throws AdministrativeStateException throws admin state exception if resource is locked + * @throws StandbyStatusException if resource is in standby + */ + public void startTransaction() throws AdministrativeStateException, StandbyStatusException { + + synchronized (startTransactionLock) { + // check admin state and throw exception if locked + if ((stateManager.getAdminState() != null) && stateManager.getAdminState().equals(StateManagement.LOCKED)) { + String msg = "Resource " + resourceName + " is administratively locked"; + + throw new AdministrativeStateException("IntegrityMonitor Admin State Exception: " + msg); + } + // check standby state and throw exception if locked + + if ((stateManager.getStandbyStatus() != null) + && (stateManager.getStandbyStatus().equals(StateManagement.HOT_STANDBY) + || stateManager.getStandbyStatus().equals(StateManagement.COLD_STANDBY))) { + String msg = "Resource " + resourceName + " is standby"; + + throw new StandbyStatusException("IntegrityMonitor Standby Status Exception: " + msg); + } + + // reset transactionTimer so it will not fire + elapsedTestTransTime = 0; + } + } + + /** + * Increment the local forward progress counter. Called by application at the end of each + * transaction (successful or not). + */ + public void endTransaction() { + synchronized (endTransactionLock) { + if (getAllNotWellMap() != null) { + if (!(getAllNotWellMap().isEmpty())) { + /* + * An entity has reported that it is not well. We must not allow the the forward + * progress counter to advance. + */ + String msg = "allNotWellMap:"; + for (Entry<String, String> entry : allNotWellMap.entrySet()) { + msg = msg.concat("\nkey = " + entry.getKey() + " msg = " + entry.getValue()); + } + logger.error("endTransaction: allNotWellMap is NOT EMPTY. Not advancing forward" + + "progress counter. \n{}\n", msg); + return; + } else { + if (logger.isDebugEnabled()) { + if (getAllSeemsWellMap() != null) { + if (!(getAllSeemsWellMap().isEmpty())) { + String msg = "allSeemsWellMap:"; + for (Entry<String, String> entry : allSeemsWellMap.entrySet()) { + msg = msg.concat("\nkey = " + entry.getKey() + " msg = " + entry.getValue()); + } + logger.debug( + "endTransaction: allNotWellMap IS EMPTY and allSeemsWellMap is NOT EMPTY. " + + "Advancing forward progress counter. \n{}\n", + msg); + } + } + } + } + } + // increment local FPC + fpCounter++; + } + } + + // update FP count in DB with local FP count + private void writeFpc() throws IntegrityMonitorException { + + // Start a transaction + EntityTransaction et = em.getTransaction(); + + if (!et.isActive()) { + et.begin(); + } + + try { + // query if ForwardProgress entry exists for resourceName + Query fquery = em.createQuery("Select f from ForwardProgressEntity f where f.resourceName=:rn"); + fquery.setParameter("rn", resourceName); + + @SuppressWarnings("rawtypes") + List fpList = fquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + ForwardProgressEntity fpx; + if (!fpList.isEmpty()) { + // ignores multiple results + fpx = (ForwardProgressEntity) fpList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(fpx); + if (logger.isDebugEnabled()) { + logger.debug("Updating FP entry: Resource={}, fpcCount={}, lastUpdated={}, new fpcCount={}", + resourceName, fpx.getFpcCount(), fpx.getLastUpdated(), fpCounter); + } + fpx.setFpcCount(fpCounter); + em.persist(fpx); + // flush to the DB and commit + synchronized (imFlushLock) { + et.commit(); + } + } else { + // Error - FP entry does not exist + String msg = "FP entry not found in database for resource " + resourceName; + throw new IntegrityMonitorException(msg); + } + } catch (Exception e) { + try { + synchronized (imFlushLock) { + if (et.isActive()) { + et.rollback(); + } + } + } catch (Exception e1) { + logger.error("IntegrityMonitor threw exception.", e1); + } + logger.error("writeFpc DB table commit failed with exception: {}", e); + throw e; + } + } + + // retrieve state manager reference + public final StateManagement getStateManager() { + return this.stateManager; + } + + /** + * Read and validate properties. + * + * @throws IntegrityMonitorPropertiesException if a property is invalid + */ + private static void validateProperties(Properties prop) throws IntegrityMonitorPropertiesException { + + if (prop.getProperty(IntegrityMonitorProperties.DB_DRIVER) == null) { + String msg = IntegrityMonitorProperties.DB_DRIVER + " property is null"; + logger.error("{}", msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + } + + if (prop.getProperty(IntegrityMonitorProperties.DB_URL) == null) { + String msg = IntegrityMonitorProperties.DB_URL + " property is null"; + logger.error("{}", msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + } + + if (prop.getProperty(IntegrityMonitorProperties.DB_USER) == null) { + String msg = IntegrityMonitorProperties.DB_USER + " property is null"; + logger.error("{}", msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + } + + if (prop.getProperty(IntegrityMonitorProperties.DB_PWD) == null) { + String msg = IntegrityMonitorProperties.DB_PWD + " property is null"; + logger.error("{}", msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + } + + if (prop.getProperty(IntegrityMonitorProperties.FP_MONITOR_INTERVAL) != null) { + try { + monitorIntervalMs = toMillis( + Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.FP_MONITOR_INTERVAL).trim())); + } catch (NumberFormatException e) { + logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.FP_MONITOR_INTERVAL, e); + } + } + + if (prop.getProperty(IntegrityMonitorProperties.FAILED_COUNTER_THRESHOLD) != null) { + try { + failedCounterThreshold = + Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.FAILED_COUNTER_THRESHOLD).trim()); + } catch (NumberFormatException e) { + logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.FAILED_COUNTER_THRESHOLD, e); + } + } + + if (prop.getProperty(IntegrityMonitorProperties.TEST_TRANS_INTERVAL) != null) { + try { + testTransIntervalMs = toMillis( + Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.TEST_TRANS_INTERVAL).trim())); + } catch (NumberFormatException e) { + logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.TEST_TRANS_INTERVAL, e); + } + } + + if (prop.getProperty(IntegrityMonitorProperties.WRITE_FPC_INTERVAL) != null) { + try { + writeFpcIntervalMs = toMillis( + Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.WRITE_FPC_INTERVAL).trim())); + } catch (NumberFormatException e) { + logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.WRITE_FPC_INTERVAL, e); + } + } + + if (prop.getProperty(IntegrityMonitorProperties.CHECK_DEPENDENCY_INTERVAL) != null) { + try { + checkDependencyIntervalMs = toMillis(Integer + .parseInt(prop.getProperty(IntegrityMonitorProperties.CHECK_DEPENDENCY_INTERVAL).trim())); + } catch (NumberFormatException e) { + logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.CHECK_DEPENDENCY_INTERVAL, e); + } + } + + // dependency_groups are a semi-colon separated list of groups + // each group is a comma separated list of resource names + // For ex. dependency_groups = site_1.pap_1,site_1.pap_2 ; site_1.pdp_1, + // site_1.pdp_2 + if (prop.getProperty(IntegrityMonitorProperties.DEPENDENCY_GROUPS) != null) { + try { + depGroups = prop.getProperty(IntegrityMonitorProperties.DEPENDENCY_GROUPS).split(";"); + if (logger.isDebugEnabled()) { + logger.debug("dependency groups property = {}", Arrays.toString(depGroups)); + } + } catch (Exception e) { + logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.DEPENDENCY_GROUPS, e); + } + } + + siteName = prop.getProperty(IntegrityMonitorProperties.SITE_NAME); + if (siteName == null) { + String msg = IntegrityMonitorProperties.SITE_NAME + " property is null"; + logger.error("{}", msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + } else { + siteName = siteName.trim(); + } + + nodeType = prop.getProperty(IntegrityMonitorProperties.NODE_TYPE); + if (nodeType == null) { + String msg = IntegrityMonitorProperties.NODE_TYPE + " property is null"; + logger.error("{}", msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + } else { + nodeType = nodeType.trim(); + if (!isNodeTypeEnum(nodeType)) { + String msg = IntegrityMonitorProperties.NODE_TYPE + " property " + nodeType + " is invalid"; + logger.error("{}", msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + } + } + + if (prop.getProperty(IntegrityMonitorProperties.TEST_VIA_JMX) != null) { + String jmxTest = prop.getProperty(IntegrityMonitorProperties.TEST_VIA_JMX).trim(); + testViaJmx = Boolean.parseBoolean(jmxTest); + } + + if (prop.getProperty(IntegrityMonitorProperties.JMX_FQDN) != null) { + jmxFqdn = prop.getProperty(IntegrityMonitorProperties.JMX_FQDN).trim(); + if (jmxFqdn.isEmpty()) { + jmxFqdn = null; + } + } + + if (prop.getProperty(IntegrityMonitorProperties.MAX_FPC_UPDATE_INTERVAL) != null) { + try { + maxFpcUpdateIntervalMs = toMillis( + Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.MAX_FPC_UPDATE_INTERVAL).trim())); + } catch (NumberFormatException e) { + logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.MAX_FPC_UPDATE_INTERVAL, e); + } + } + + if (prop.getProperty(IntegrityMonitorProperties.STATE_AUDIT_INTERVAL_MS) != null) { + try { + stateAuditIntervalMs = + Long.parseLong(prop.getProperty(IntegrityMonitorProperties.STATE_AUDIT_INTERVAL_MS)); + } catch (NumberFormatException e) { + logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.STATE_AUDIT_INTERVAL_MS, e); + } + } + + if (prop.getProperty(IntegrityMonitorProperties.REFRESH_STATE_AUDIT_INTERVAL_MS) != null) { + try { + refreshStateAuditIntervalMs = + Long.parseLong(prop.getProperty(IntegrityMonitorProperties.REFRESH_STATE_AUDIT_INTERVAL_MS)); + } catch (NumberFormatException e) { + logger.warn("Ignored invalid property: {}", IntegrityMonitorProperties.REFRESH_STATE_AUDIT_INTERVAL_MS, + e); + } + } + + logger.debug("IntegrityMonitor.validateProperties(): Property values \n" + "maxFpcUpdateIntervalMs = {}\n", + maxFpcUpdateIntervalMs); + + return; + } + + /** + * Update properties. + * + * @param newprop the new properties + */ + public static void updateProperties(Properties newprop) { + if (isUnitTesting()) { + try { + validateProperties(newprop); + } catch (IntegrityMonitorPropertiesException e) { + logger.error("IntegrityMonitor threw exception.", e); + } + } else { + logger.debug("Update integrity monitor properties not allowed"); + } + } + + private static boolean isNodeTypeEnum(String nodeType) { + String upper = nodeType.toUpperCase(); + for (NodeType n : NodeType.values()) { + if (n.toString().equals(upper)) { + return true; + } + } + return false; + } + + /** + * Look for "Forward Progress" -- if the 'FPMonitor' is stalled for too long, the operational + * state is changed to 'Disabled', and an alarm is set. The state is restored when forward + * progress continues. + */ + private void fpMonitorCycle() { + logger.debug("fpMonitorCycle(): entry"); + synchronized (fpMonitorCycleLock) { + // monitoring interval checks + if (monitorIntervalMs <= 0) { + logger.debug("fpMonitorCycle(): disabled"); + elapsedTime = 0; + return; // monitoring is disabled + } + + elapsedTime = elapsedTime + cycleIntervalMillis; + if (elapsedTime < monitorIntervalMs) { + return; // monitoring interval not reached + } + + elapsedTime = 0; // reset elapsed time + + try { + if (fpCounter == lastFpCounter) { + // no forward progress + missedCycles += 1; + if (missedCycles >= failedCounterThreshold && !alarmExists) { + logger.debug("Forward progress not detected for resource {}. Setting state to disable failed.", + resourceName); + if (!(stateManager.getOpState()).equals(StateManagement.DISABLED)) { + // Note: The refreshStateAudit will make redundant + // calls + stateManager.disableFailed(); + } + // The refreshStateAudit will catch the case where opStat = disabled and + // availState ! failed/dependency.failed + alarmExists = true; + } + } else { + // forward progress has occurred + lastFpCounter = fpCounter; + missedCycles = 0; + // set op state to enabled + logger.debug("Forward progress detected for resource {}. Setting state to enable not failed.", + resourceName); + if (!(stateManager.getOpState()).equals(StateManagement.ENABLED)) { + // Note: The refreshStateAudit will make redundant calls + stateManager.enableNotFailed(); + } + // The refreshStateAudit will catch the case where opState=enabled and + // availStatus != null + alarmExists = false; + } + } catch (Exception e) { + // log error + logger.error("FP Monitor encountered error. ", e); + } + } + logger.debug("fpMonitorCycle(): exit"); + } + + /** + * Look for "Forward Progress" on other nodes. If they are not making forward progress, check + * their operational state. If it is not disabled, then disable them. + */ + private void stateAudit() { + logger.debug("IntegrityMonitor.stateAudit(): entry"); + if (stateAuditIntervalMs <= 0) { + logger.debug("IntegrityMonitor.stateAudit(): disabled"); + return; // stateAudit is disabled + } + + // Only run from nodes that are operational + if (stateManager.getOpState().equals(StateManagement.DISABLED)) { + logger.debug("IntegrityMonitor.stateAudit(): DISABLED. returning"); + return; + } + if (stateManager.getAdminState().equals(StateManagement.LOCKED)) { + logger.debug("IntegrityMonitor.stateAudit(): LOCKED. returning"); + return; + } + if (!stateManager.getStandbyStatus().equals(StateManagement.NULL_VALUE) + && stateManager.getStandbyStatus() != null) { + if (!stateManager.getStandbyStatus().equals(StateManagement.PROVIDING_SERVICE)) { + logger.debug("IntegrityMonitor.stateAudit(): NOT PROVIDING_SERVICE. returning"); + return; + } + } + + Date date = new Date(); + long timeSinceLastStateAudit = date.getTime() - lastStateAuditTime.getTime(); + if (timeSinceLastStateAudit < stateAuditIntervalMs) { + logger.debug("IntegrityMonitor.stateAudit(): Not time to run. returning"); + return; + } + + executeStateAudit(); + + lastStateAuditTime = date; + + logger.debug("IntegrityMonitor.stateAudit(): exit"); + } + + /** + * Execute state audit. + */ + public void executeStateAudit() { + logger.debug("IntegrityMonitor.executeStateAudit(): entry"); + Date date = new Date(); + + // Get all entries in the forwardprogressentity table + List<ForwardProgressEntity> fpList = getAllForwardProgressEntity(); + + // Check if each forwardprogressentity entry is current + for (ForwardProgressEntity fpe : fpList) { + // If the this is my ForwardProgressEntity, continue + if (fpe.getResourceName().equals(IntegrityMonitor.resourceName)) { + continue; + } + // Make sure you are not getting a cached version + em.refresh(fpe); + long diffMs = date.getTime() - fpe.getLastUpdated().getTime(); + if (logger.isDebugEnabled()) { + logger.debug("IntegrityMonitor.executeStateAudit(): resource = {}, diffMs = {}", fpe.getResourceName(), + diffMs); + } + + // Threshold for a stale entry + long staleMs = maxFpcUpdateIntervalMs; + if (logger.isDebugEnabled()) { + logger.debug("IntegrityMonitor.executeStateAudit(): resource = {}, staleMs = {}", fpe.getResourceName(), + staleMs); + } + + if (diffMs > staleMs) { + // ForwardProgress is stale. Disable it + // Start a transaction + logger.debug("IntegrityMonitor.executeStateAudit(): resource = {}, FPC is stale. Disabling it"); + EntityTransaction et = em.getTransaction(); + et.begin(); + StateManagementEntity sme = null; + try { + // query if StateManagement entry exists for fpe resource + Query query = + em.createQuery("Select p from StateManagementEntity p where p.resourceName=:resource"); + query.setParameter("resource", fpe.getResourceName()); + + @SuppressWarnings("rawtypes") + List smList = + query.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + if (!smList.isEmpty()) { + // exists + sme = (StateManagementEntity) smList.get(0); + // refresh the object from DB in case cached data was + // returned + em.refresh(sme); + if (logger.isDebugEnabled()) { + logger.debug( + "IntegrityMonitor.executeStateAudit(): Found entry in StateManagementEntity table " + + "for Resource={}", + sme.getResourceName()); + } + } else { + String msg = "IntegrityMonitor.executeStateAudit(): " + fpe.getResourceName() + + ": resource not found in state management entity database table"; + logger.error("{}", msg); + } + synchronized (imFlushLock) { + et.commit(); + } + } catch (Exception e) { + // log an error + logger.error("IntegrityMonitor.executeStateAudit(): {}: StateManagementEntity DB read failed with " + + "exception: ", fpe.getResourceName(), e); + synchronized (imFlushLock) { + if (et.isActive()) { + et.rollback(); + } + } + } + + if (sme != null && !sme.getOpState().equals(StateManagement.DISABLED)) { + if (logger.isDebugEnabled()) { + logger.debug("IntegrityMonitor.executeStateAudit(): Changing OpStat = disabled for {}", + sme.getResourceName()); + } + try { + stateManager.disableFailed(sme.getResourceName()); + } catch (Exception e) { + String msg = "IntegrityMonitor.executeStateAudit(): Failed to disable " + sme.getResourceName(); + logger.error("{}", msg, e); + } + } + } // end if(diffMs > staleMs) + } // end for(ForwardProgressEntity fpe : fpList) + logger.debug("IntegrityMonitor.executeStateAudit(): exit"); + } + + /** + * Execute a test transaction when test transaction interval has elapsed. + */ + private void checkTestTransaction() { + logger.debug("checkTestTransaction(): entry"); + synchronized (checkTestTransactionLock) { + + // test transaction timer checks + if (testTransIntervalMs <= 0) { + logger.debug("checkTestTransaction(): disabled"); + elapsedTestTransTime = 0; + return; // test transaction is disabled + } + + elapsedTestTransTime = elapsedTestTransTime + cycleIntervalMillis; + if (elapsedTestTransTime < testTransIntervalMs) { + return; // test transaction interval not reached + } + + elapsedTestTransTime = 0; // reset elapsed time + + // execute test transaction + testTransaction(); + } + logger.debug("checkTestTransaction(): exit"); + } + + /** + * Updates Fpc counter in database when write Fpc interval has elapsed. + */ + private void checkWriteFpc() { + logger.debug("checkWriteFpc(): entry"); + synchronized (checkWriteFpcLock) { + + // test transaction timer checks + if (writeFpcIntervalMs <= 0) { + logger.debug("checkWriteFpc(): disabled"); + elapsedWriteFpcTime = 0; + return; // write Fpc is disabled + } + + elapsedWriteFpcTime = elapsedWriteFpcTime + cycleIntervalMillis; + if (elapsedWriteFpcTime < writeFpcIntervalMs) { + return; // write Fpc interval not reached + } + + elapsedWriteFpcTime = 0; // reset elapsed time + + // write Fpc to database + try { + writeFpc(); + } catch (Exception e) { + logger.error("IntegrityMonitor threw exception.", e); + } + } + logger.debug("checkWriteFpc(): exit"); + } + + /** + * Execute a dependency health check periodically which also updates this resource's state. + */ + private void checkDependentHealth() { + logger.debug("checkDependentHealth: entry"); + if (checkDependencyIntervalMs <= 0) { + logger.debug("checkDependentHealth: disabled"); + return; // dependency monitoring is disabled + } + + long currTime = System.currentTimeMillis(); + logger.debug("checkDependentHealth currTime - lastDependencyCheckTime = {}", + currTime - lastDependencyCheckTime); + if ((currTime - lastDependencyCheckTime) > checkDependencyIntervalMs) { + // execute dependency check and update this resource's state + + dependencyCheck(); + } + logger.debug("checkDependentHealth: exit"); + } + + /* + * This is a simple refresh audit which is periodically run to assure that the states and status + * attributes are aligned and notifications are sent to any listeners. It is possible for + * state/status to get out of synch and notified systems to be out of synch due to database + * corruption (manual or otherwise) or because a node became isolated. + * + * When the operation (lock/unlock) is called, it will cause a re-evaluation of the state and + * send a notification to all registered observers. + */ + private void refreshStateAudit() { + logger.debug("refreshStateAudit(): entry"); + if (refreshStateAuditIntervalMs <= 0) { + // The audit is disabled + logger.debug("refreshStateAudit(): disabled"); + return; + } + executeRefreshStateAudit(); + logger.debug("refreshStateAudit(): exit"); + } + + /** + * Execute refresh state audit. + */ + public void executeRefreshStateAudit() { + logger.debug("executeRefreshStateAudit(): entry"); + synchronized (refreshStateAuditLock) { + logger.debug("refreshStateAudit: entry"); + Date now = new Date(); + long nowMs = now.getTime(); + long lastTimeMs = refreshStateAuditLastRunDate.getTime(); + logger.debug("refreshStateAudit: ms since last run = {}", nowMs - lastTimeMs); + + if ((nowMs - lastTimeMs) > refreshStateAuditIntervalMs) { + String adminState = stateManager.getAdminState(); + logger.debug("refreshStateAudit: adminState = {}", adminState); + if (adminState.equals(StateManagement.LOCKED)) { + try { + logger.debug("refreshStateAudit: calling lock()"); + stateManager.lock(); + } catch (Exception e) { + logger.error("refreshStateAudit: caught unexpected exception from stateManager.lock(): ", e); + } + } else { // unlocked + try { + logger.debug("refreshStateAudit: calling unlock()"); + stateManager.unlock(); + } catch (Exception e) { + logger.error("refreshStateAudit: caught unexpected exception from stateManager.unlock(): ", e); + } + } + refreshStateAuditLastRunDate = new Date(); + logger.debug("refreshStateAudit: exit"); + } + } + logger.debug("executeRefreshStateAudit(): exit"); + } + + /** + * The following nested class periodically performs the forward progress check, checks + * dependencies, does a refresh state audit and runs the stateAudit. + */ + class FpManager extends Thread { + private final CountDownLatch stopper = new CountDownLatch(1); + + private BlockingQueue<CountDownLatch> queue; + private CountDownLatch progressLatch = null; + + // Constructor - start FP manager thread + FpManager(BlockingQueue<CountDownLatch> queue) { + this.queue = queue; + // set now as the last time the refreshStateAudit ran + IntegrityMonitor.this.refreshStateAuditLastRunDate = new Date(); + // start thread + this.start(); + } + + @Override + public void run() { + logger.debug("FPManager thread running"); + + try { + getLatch(); + decrementLatch(); + + while (!stopper.await(cycleIntervalMillis, TimeUnit.MILLISECONDS)) { + getLatch(); + IntegrityMonitor.this.runOnce(); + decrementLatch(); + } + + } catch (InterruptedException e) { + logger.debug("IntegrityMonitor threw exception.", e); + Thread.currentThread().interrupt(); + } + } + + public void stopAndExit() { + stopper.countDown(); + this.interrupt(); + } + + /** + * Gets the next latch from the queue. + * + * @throws InterruptedException + * + */ + private void getLatch() throws InterruptedException { + if (queue != null) { + progressLatch = queue.take(); + } + } + + /** + * Decrements the current latch. + */ + private void decrementLatch() { + if (progressLatch != null) { + progressLatch.countDown(); + } + } + + } + + private void runOnce() { + try { + logger.debug("FPManager calling fpMonitorCycle()"); + // check forward progress timer + fpMonitorCycle(); + + logger.debug("FPManager calling checkTestTransaction()"); + // check test transaction timer + checkTestTransaction(); + + logger.debug("FPManager calling checkWriteFpc()"); + // check write Fpc timer + checkWriteFpc(); + + logger.debug("FPManager calling checkDependentHealth()"); + // check dependency health + checkDependentHealth(); + + logger.debug("FPManager calling refreshStateAudit()"); + // check if it is time to run the refreshStateAudit + refreshStateAudit(); + + logger.debug("FPManager calling stateAudit()"); + // check if it is time to run the stateAudit + stateAudit(); + + } catch (Exception e) { + logger.error("Ignore FPManager thread processing timer(s) exception: ", e); + } + } + + /** + * Set all seems well or not well for the specified key. + * + * @param key the key + * @param asw <code>true</code> if all seems well for the key, <code>false</code> if all seems + * not well for the key + * @param msg message to add for the key + * @throws AllSeemsWellException if an error occurs + */ + public void allSeemsWell(@NotNull String key, @NotNull Boolean asw, @NotNull String msg) + throws AllSeemsWellException { + + logger.debug("allSeemsWell entry: key = {}, asw = {}, msg = {}", key, asw, msg); + if (key == null || key.isEmpty()) { + logger.error("allSeemsWell: 'key' has no visible content"); + throw new IllegalArgumentException("allSeemsWell: 'key' has no visible content"); + } + if (asw == null) { + logger.error("allSeemsWell: 'asw' is null"); + throw new IllegalArgumentException("allSeemsWell: 'asw' is null"); + } + if (msg == null || msg.isEmpty()) { + logger.error("allSeemsWell: 'msg' has no visible content"); + throw new IllegalArgumentException("allSeemsWell: 'msg' has no visible content"); + } + + if (allSeemsWellMap == null) { + allSeemsWellMap = new HashMap<>(); + } + + if (allNotWellMap == null) { + allNotWellMap = new HashMap<>(); + } + + if (asw) { + logger.info("allSeemsWell: ALL SEEMS WELL: key = {}, msg = {}", key, msg); + try { + allSeemsWellMap.put(key, msg); + } catch (Exception e) { + String exceptMsg = + "allSeemsWell: encountered an exception with allSeemsWellMap.put(" + key + "," + msg + ")"; + logger.error(exceptMsg); + throw new AllSeemsWellException(exceptMsg, e); + } + + try { + allNotWellMap.remove(key); + } catch (Exception e) { + String exceptMsg = "allSeemsWell: encountered an exception with allNotWellMap.delete(" + key + ")"; + logger.error(exceptMsg); + throw new AllSeemsWellException(exceptMsg, e); + } + + } else { + logger.error("allSeemsWell: ALL NOT WELL: key = {}, msg = {}", key, msg); + try { + allSeemsWellMap.remove(key); + } catch (Exception e) { + String exceptMsg = "allSeemsWell: encountered an exception with allSeemsWellMap.remove(" + key + ")"; + logger.error(exceptMsg); + throw new AllSeemsWellException(exceptMsg, e); + } + + try { + allNotWellMap.put(key, msg); + } catch (Exception e) { + String exceptMsg = "allSeemsWell: encountered an exception with allNotWellMap.put(" + key + msg + ")"; + logger.error(exceptMsg); + throw new AllSeemsWellException(exceptMsg, e); + } + } + + if (logger.isDebugEnabled()) { + for (Entry<String, String> entry : allSeemsWellMap.entrySet()) { + logger.debug("allSeemsWellMap: key = {} msg = {}", entry.getKey(), entry.getValue()); + } + for (Entry<String, String> entry : allNotWellMap.entrySet()) { + logger.debug("allNotWellMap: key = {} msg = {}", entry.getKey(), entry.getValue()); + } + logger.debug("allSeemsWell exit"); + } + } + + /** + * Converts the given value to milliseconds using the current {@link #propertyUnits}. + * + * @param value value to be converted, or -1 + * @return the value, in milliseconds, or -1 + */ + private static long toMillis(long value) { + return (value < 0 ? -1 : propertyUnits.toMillis(value)); + } + + public Map<String, String> getAllSeemsWellMap() { + return allSeemsWellMap; + } + + public Map<String, String> getAllNotWellMap() { + return allNotWellMap; + } + + /* + * The remaining methods are used by JUnit tests. + */ + + public static boolean isUnitTesting() { + return isUnitTesting; + } + + public static void setUnitTesting(boolean isUnitTesting) { + IntegrityMonitor.isUnitTesting = isUnitTesting; + } + + protected static TimeUnit getPropertyUnits() { + return propertyUnits; + } + + protected static void setPropertyUnits(TimeUnit propertyUnits) { + IntegrityMonitor.propertyUnits = propertyUnits; + } + + protected static long getCycleIntervalMillis() { + return cycleIntervalMillis; + } + + protected static void setCycleIntervalMillis(long cycleIntervalMillis) { + IntegrityMonitor.cycleIntervalMillis = cycleIntervalMillis; + } + + protected static String getPersistenceUnit() { + return persistenceUnit; + } + + protected static void setPersistenceUnit(String persistenceUnit) { + IntegrityMonitor.persistenceUnit = persistenceUnit; + } } |