diff options
Diffstat (limited to 'integrity-monitor/src/main/java/org/openecomp/policy/common/im/IntegrityMonitor.java')
-rw-r--r-- | integrity-monitor/src/main/java/org/openecomp/policy/common/im/IntegrityMonitor.java | 1300 |
1 files changed, 1300 insertions, 0 deletions
diff --git a/integrity-monitor/src/main/java/org/openecomp/policy/common/im/IntegrityMonitor.java b/integrity-monitor/src/main/java/org/openecomp/policy/common/im/IntegrityMonitor.java new file mode 100644 index 00000000..6c575ab7 --- /dev/null +++ b/integrity-monitor/src/main/java/org/openecomp/policy/common/im/IntegrityMonitor.java @@ -0,0 +1,1300 @@ +/*- + * ============LICENSE_START======================================================= + * Integrity Monitor + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.policy.common.im; + +import java.net.InetAddress; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import javax.management.JMX; +import javax.management.MBeanServerConnection; +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.EntityTransaction; +import javax.persistence.FlushModeType; +import javax.persistence.LockModeType; +import javax.persistence.Persistence; +import javax.persistence.Query; + +//import org.apache.log4j.Logger; + +import org.openecomp.policy.common.im.jmx.*; +import org.openecomp.policy.common.im.jpa.ForwardProgressEntity; +import org.openecomp.policy.common.im.jpa.ResourceRegistrationEntity; +import org.openecomp.policy.common.im.jpa.StateManagementEntity; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; + +/** + * IntegrityMonitor + * Main class for monitoring the integrity of a resource and managing its state. State management follows + * the X.731 ITU standard. + */ +public class IntegrityMonitor { + private static final Logger logger = FlexLogger.getLogger(IntegrityMonitor.class.getName()); + + //private static final Map<String, IntegrityMonitor> imInstances = new HashMap<String, IntegrityMonitor>(); + + // only allow one instance of IntegrityMonitor + private static IntegrityMonitor instance = null; + + private static String resourceName = null; + private boolean fpcError = false; + boolean alarmExists = false; + + /* + * Error message that is written by the dependencyCheck() method. It is made available externally + * through the evaluateSanity() method. + */ + private String dependencyCheckErrorMsg = ""; + + + // The entity manager factory for JPA access + private EntityManagerFactory emf; + private EntityManager em; + + // Persistence Unit for JPA + private static final String PERSISTENCE_UNIT = "operationalPU"; + + private ComponentAdmin admin = null; + private StateManagement stateManager = null; + + private static final int CYCLE_INTERVAL_MILLIS = 1000; + + // The forward progress counter is incremented as the + // process being monitored makes forward progress + private int fpCounter = 0; + private int lastFpCounter = 0; + + // elapsed time since last FP counter check + private long elapsedTime = 0; + + // elapsed time since last test transaction check + private long elapsedTestTransTime = 0; + + // elapsed time since last write Fpc check + private long elapsedWriteFpcTime = 0; + + // last dependency health check time. Initialize so that the periodic check starts after 60 seconds. + // This allows time for dependents to come up. + private long lastDependencyCheckTime = System.currentTimeMillis(); + + // the number of cycles since 'fpCounter' was last changed + private int missedCycles = 0; + + // forward progress monitoring interval + private static int monitorInterval = IntegrityMonitorProperties.DEFAULT_MONITOR_INTERVAL; + // The number of periods the counter fails to increment before an alarm is raised. + private static int failedCounterThreshold = IntegrityMonitorProperties.DEFAULT_FAILED_COUNTER_THRESHOLD; + // test transaction interval + private static int testTransInterval = IntegrityMonitorProperties.DEFAULT_TEST_INTERVAL; + // write Fpc to DB interval + private static int writeFpcInterval = IntegrityMonitorProperties.DEFAULT_WRITE_FPC_INTERVAL; + + // A lead subsystem will have dependency groups with resource names in the properties file. + // For non-lead subsystems, the dependency_group property will be absent. + private static String [] dep_groups = null; + + public static boolean isUnitTesting = false; + + // can turn on health checking of dependents via jmx test() call by setting this property to true + private static boolean testViaJmx = false; + + private static String jmxFqdn = null; + + // this is the max interval allowed without any forward progress counter updates + private static int maxFpcUpdateInterval = IntegrityMonitorProperties.DEFAULT_MAX_FPC_UPDATE_INTERVAL; + + // Node types + private enum NodeType { + pdp_xacml, + pdp_drools, + pap, + pap_admin, + logparser, + brms_gateway, + astra_gateway, + elk_server, + pypdp + + } + + private static String site_name; + private static String node_type; + private Date refreshStateAuditLastRunDate; + private int refreshStateAuditIntervalMs = 60000; //run it once per minute + + //lock objects + private final Object evaluateSanityLock = new Object(); + private final Object fpMonitorCycleLock = new Object(); + private final Object dependencyCheckLock = new Object(); + private final Object testTransactionLock = new Object(); + private final Object startTransactionLock = new Object(); + private final Object endTransactionLock = new Object(); + private final Object checkTestTransactionLock = new Object(); + private final Object checkWriteFpcLock = new Object(); + private static final Object getInstanceLock = new Object(); + private final Object refreshStateAuditLock = new Object(); + private final Object IMFLUSHLOCK = new Object(); + + /** + * Get an instance of IntegrityMonitor for a given resource name. It creates one if it does not exist. + * Only one instance is allowed to be created per resource name. + * @param resourceName The resource name of the resource + * @param properties a set of properties passed in from the resource + * @return The new instance of IntegrityMonitor + * @throws Exception if unable to create jmx url or the constructor returns an exception + */ + public static IntegrityMonitor getInstance(String resourceName, Properties properties) throws Exception { + synchronized(getInstanceLock){ + logger.info("getInstance() called - resourceName=" + resourceName); + if (resourceName == null || resourceName.isEmpty() || properties == null) { + logger.error("Error: getIntegrityMonitorInstance() called with invalid input"); + return null; + } + + if (instance == null) { + logger.info("Creating new instance of IntegrityMonitor"); + instance = new IntegrityMonitor(resourceName, properties); + } + return instance; + } + } + + public static IntegrityMonitor getInstance() throws Exception{ + logger.info("getInstance() called"); + if (instance == null) { + String msg = "No IntegrityMonitor instance exists." + + " Please use the method IntegrityMonitor.getInstance(String resourceName, Properties properties)"; + throw new IntegrityMonitorPropertiesException(msg); + }else{ + return instance; + } + } + + public static void deleteInstance(){ + logger.info("deleteInstance() called"); + if(isUnitTesting){ + instance=null; + } + logger.info("deleteInstance() exit"); + } + /** + * IntegrityMonitor constructor. It is invoked from the getInstance() method in + * this class or from the constructor of a child or sub-class. A class can extend + * the IntegrityMonitor class if there is a need to override any of the base + * methods (ex. subsystemTest()). Only one instance is allowed to be created per + * resource name. + * @param resourceName The resource name of the resource + * @param properties a set of properties passed in from the resource + * @throws Exception if any errors are encountered in the consructor + */ + protected IntegrityMonitor(String resourceName, Properties properties) throws Exception { + + // singleton check since this constructor can be called from a child or sub-class + if (instance != null) { + String msg = "IM object exists and only one instance allowed"; + logger.error(msg); + throw new Exception("IntegrityMonitor constructor exception: " + msg); + } + instance = this; + + IntegrityMonitor.resourceName = resourceName; + + /* + * Validate that the properties file contains all the needed properties. Throws + * an IntegrityMonitorPropertiesException + */ + validateProperties(properties); + + // construct jmx url + String jmxUrl = getJmxUrl(); + + // + // Create the entity manager factory + // + emf = Persistence.createEntityManagerFactory(PERSISTENCE_UNIT, properties); + // + // Did it get created? + // + if (emf == null) { + logger.error("Error creating IM entity manager factory with persistence unit: " + + PERSISTENCE_UNIT); + throw new Exception("Unable to create IM Entity Manager Factory"); + } + + // add entry to forward progress and resource registration tables in DB + + // Start a transaction + em = emf.createEntityManager(); + EntityTransaction et = em.getTransaction(); + + et.begin(); + + try { + // if ForwardProgress entry exists for resourceName, update it. If not found, create a new entry + Query fquery = em.createQuery("Select f from ForwardProgressEntity f where f.resourceName=:rn"); + fquery.setParameter("rn", resourceName); + + @SuppressWarnings("rawtypes") + List fpList = fquery.setLockMode( + LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + ForwardProgressEntity fpx = null; + if(!fpList.isEmpty()){ + //ignores multiple results + fpx = (ForwardProgressEntity) fpList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(fpx); + logger.info("Resource " + resourceName + " exists and will be updated - old fpc=" + fpx.getFpcCount() + ", lastUpdated=" + fpx.getLastUpdated()); + fpx.setFpcCount(fpCounter); + }else{ + //Create a forward progress object + logger.info("Adding resource " + resourceName + " to ForwardProgress table"); + fpx = new ForwardProgressEntity(); + } + //update/set columns in entry + fpx.setResourceName(resourceName); + em.persist(fpx); + // flush to the DB + synchronized(IMFLUSHLOCK){ + em.flush(); + } + + // if ResourceRegistration entry exists for resourceName, update it. If not found, create a new entry + Query rquery = em.createQuery("Select r from ResourceRegistrationEntity r where r.resourceName=:rn"); + rquery.setParameter("rn", resourceName); + + @SuppressWarnings("rawtypes") + List rrList = rquery.setLockMode( + LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + ResourceRegistrationEntity rrx = null; + if(!rrList.isEmpty()){ + //ignores multiple results + rrx = (ResourceRegistrationEntity) rrList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(rrx); + logger.info("Resource " + resourceName + " exists and will be updated - old url=" + rrx.getResourceUrl() + ", createdDate=" + rrx.getCreatedDate()); + rrx.setLastUpdated(new Date()); + }else{ + // register resource by adding entry to table in DB + logger.info("Adding resource " + resourceName + " to ResourceRegistration table"); + rrx = new ResourceRegistrationEntity(); + } + //update/set columns in entry + rrx.setResourceName(resourceName); + rrx.setResourceUrl(jmxUrl); + rrx.setNodeType(node_type); + rrx.setSite(site_name); + em.persist(rrx); + // flush to the DB + synchronized(IMFLUSHLOCK){ + em.flush(); + et.commit(); + } + + } catch (Exception e) { + logger.error("IntegrityMonitor constructor DB table update failed with exception: " + e); + try { + if (et.isActive()) { + synchronized(IMFLUSHLOCK){ + et.rollback(); + } + } + } catch (Exception e1) { + // ignore + } + throw e; + } + + // create instance of StateMangement class and pass emf to it + stateManager = new StateManagement(emf, resourceName); + + /** + * Initialize the state and status attributes. This will maintain any Administrative state value + * but will set the operational state = enabled, availability status = null, standby status = null. + * The integrity monitor will set the operational state via the FPManager and the owning application + * must set the standby status by calling promote/demote on the StateManager. + */ + stateManager.initializeState(); + + + // create management bean + try { + admin = new ComponentAdmin(resourceName, this, stateManager); + } catch (Exception e) { + logger.error("ComponentAdmin constructor exception: " + e.toString()); + } + + // create FPManager inner class + FPManager fpMonitor = new FPManager(); + + + } + + private static String getJmxUrl() throws Exception { + + // get the jmx remote port and construct the JMX URL + Properties systemProps = System.getProperties(); + String jmx_port = systemProps.getProperty("com.sun.management.jmxremote.port"); + String jmx_err_msg = ""; + if (jmx_port == null) { + jmx_err_msg = "System property com.sun.management.jmxremote.port for JMX remote port is not set"; + logger.error(jmx_err_msg); + throw new Exception("getJmxUrl exception: " + jmx_err_msg); + } + + int port = 0; + try { + port = Integer.parseInt(jmx_port); + } catch (NumberFormatException e) { + jmx_err_msg = "JMX remote port is not a valid integer value - " + jmx_port; + logger.error(jmx_err_msg); + throw new Exception("getJmxUrl exception: " + jmx_err_msg); + } + + try { + if (jmxFqdn == null) { + jmxFqdn = InetAddress.getLocalHost().getCanonicalHostName(); // get FQDN of this host + } + } catch (Exception e) { + String msg = "getJmxUrl could not get hostname" + e; + logger.error(msg); + throw new Exception("getJmxUrl Exception: " + msg); + } + if (jmxFqdn == null) { + String msg = "getJmxUrl encountered null hostname"; + logger.error(msg); + throw new Exception("getJmxUrl error: " + msg); + } + + // assemble the jmx url + String jmx_url = "service:jmx:rmi:///jndi/rmi://" + jmxFqdn + ":" + port + "/jmxrmi"; + logger.info("IntegerityMonitor - jmx url=" + jmx_url); + + return jmx_url; + } + /** + * evaluateSanity() is designed to be called by an external entity to evealuate the sanity + * of the node. It checks the operational and administrative states and the standby + * status. If the operational state is disabled, it will include the dependencyCheckErrorMsg + * which includes information about any dependency (node) which has failed. + */ + public void evaluateSanity() throws Exception { + logger.debug("evaluateSanity called ...."); + synchronized(evaluateSanityLock){ + + String error_msg = dependencyCheckErrorMsg; + logger.debug("evaluateSanity dependencyCheckErrorMsg = " + error_msg); + + // check op state and throw exception if disabled + if ((stateManager.getOpState() != null) && stateManager.getOpState().equals(StateManagement.DISABLED)) { + String msg = "Resource " + resourceName + " operation state is disabled. " + error_msg; + logger.debug(msg); + throw new Exception(msg); + } + + // check admin state and throw exception if locked + if ((stateManager.getAdminState() != null) && stateManager.getAdminState().equals(StateManagement.LOCKED)) { + String msg = "Resource " + resourceName + " is administratively locked"; + logger.debug(msg); + throw new AdministrativeStateException("IntegrityMonitor Admin State Exception: " + msg); + } + // check standby state and throw exception if cold standby + if ((stateManager.getStandbyStatus() != null) && stateManager.getStandbyStatus().equals(StateManagement.COLD_STANDBY)){ + String msg = "Resource " + resourceName + " is cold standby"; + logger.debug(msg); + throw new StandbyStatusException("IntegrityMonitor Standby Status Exception: " + msg); + } + +/* + * This is checked in the FPManager where the state is coordinated + if (fpcError) { + String msg = resourceName + ": no forward progress detected"; + logger.error(msg); + throw new ForwardProgressException(msg); + } + + * Additional testing to be provided by susbsystemTest() which could be overridden + * This has been moved to dependencyCheck where it is treated as testing of a dependency + subsystemTest(); +*/ + } + + } + + private String stateCheck(String dep) { + logger.debug("checking state of dependent resource: " + dep); + + // get state management entry for dependent resource + StateManagementEntity stateManagementEntity = null; + String error_msg = null; + try { + // Start a transaction + EntityTransaction et = em.getTransaction(); + et.begin(); + + // query if StateManagement entry exists for dependent resource + Query query = em.createQuery("Select p from StateManagementEntity p where p.resourceName=:resource"); + query.setParameter("resource", dep); + + @SuppressWarnings("rawtypes") + List smList = query.setLockMode( + LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + if (!smList.isEmpty()) { + // exist + stateManagementEntity = (StateManagementEntity) smList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(stateManagementEntity); + logger.debug("Found entry in StateManagementEntity table for dependent Resource=" + dep); + } else { + error_msg = dep + ": resource not found in state management entity database table"; + logger.error(error_msg); + } + + synchronized(IMFLUSHLOCK){ + et.commit(); + } + } catch (Exception e) { + // log an error + error_msg = dep + ": StateManagementEntity DB read failed with exception: " + e; + logger.error(error_msg); + } + + // check operation, admin and standby states of dependent resource + if (error_msg == null) { + if ((stateManager.getAdminState() != null) && stateManagementEntity.getAdminState().equals(StateManagement.LOCKED)) { + error_msg = dep + ": resource is administratively locked"; + logger.error(error_msg); + } else if ((stateManager.getOpState() != null) && stateManagementEntity.getOpState().equals(StateManagement.DISABLED)) { + error_msg = dep + ": resource is operationally disabled"; + logger.error(error_msg); + } else if ((stateManager.getStandbyStatus() != null) && stateManagementEntity.getStandbyStatus().equals(StateManagement.COLD_STANDBY)) { + error_msg = dep + ": resource is cold standby"; + logger.error(error_msg); + } + } + + return error_msg; + } + + private String fpCheck(String dep) { + logger.debug("checking forward progress count of dependent resource: " + dep); + + String error_msg = null; + + // check FPC count - a changing FPC count indicates the resource JVM is running + + // Start a transaction + EntityTransaction et = em.getTransaction(); + et.begin(); + try { + Query fquery = em.createQuery("Select f from ForwardProgressEntity f where f.resourceName=:rn"); + fquery.setParameter("rn", dep); + + @SuppressWarnings("rawtypes") + List fpList = fquery.setLockMode( + LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + ForwardProgressEntity fpx = null; + if (!fpList.isEmpty()) { + //ignores multiple results + fpx = (ForwardProgressEntity) fpList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(fpx); + logger.debug("Dependent resource " + dep + " - fpc=" + fpx.getFpcCount() + ", lastUpdated=" + fpx.getLastUpdated()); + long currTime = System.currentTimeMillis(); + // if dependent resource FPC has not been updated, consider it an error + if ((currTime - fpx.getLastUpdated().getTime()) > (1000 * maxFpcUpdateInterval)) { + error_msg = dep + ": FP count has not been updated in the last " + maxFpcUpdateInterval + " seconds"; + logger.error(error_msg); + try { + // create instance of StateMangement class for dependent + StateManagement depStateManager = new StateManagement(emf, dep); + if (depStateManager != null) { + logger.info("Forward progress not detected for dependent resource " + dep + ". Setting dependent's state to disable failed."); + depStateManager.disableFailed(); + } + } catch (Exception e) { + // ignore errors + logger.info("Update dependent state failed with exception: " + e); + } + } + } else { + // resource entry not found in FPC table + error_msg = dep + ": resource not found in ForwardProgressEntity table in the DB"; + logger.error(error_msg); + } + synchronized(IMFLUSHLOCK){ + et.commit(); + } + } catch (Exception e) { + // log an error and continue + error_msg = dep + ": ForwardProgressEntity DB read failed with exception: " + e; + logger.error(error_msg); + } + + return error_msg; + } + + private String jmxCheck(String dep) { + logger.debug("checking health of dependent by calling test() via JMX on resource: " + dep); + + String error_msg = null; + + // get the JMX URL from the database + String jmxUrl = null; + try { + // Start a transaction + EntityTransaction et = em.getTransaction(); + et.begin(); + + // query if ResourceRegistration entry exists for resourceName + Query rquery = em.createQuery("Select r from ResourceRegistrationEntity r where r.resourceName=:rn"); + rquery.setParameter("rn", dep); + + @SuppressWarnings("rawtypes") + List rrList = rquery.setLockMode( + LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + ResourceRegistrationEntity rrx = null; + + if (!rrList.isEmpty()) { + //ignores multiple results + rrx = (ResourceRegistrationEntity) rrList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(rrx); + jmxUrl = rrx.getResourceUrl(); + logger.debug("Dependent Resource=" + dep + ", url=" + jmxUrl + ", createdDate=" + rrx.getCreatedDate()); + } else { + error_msg = dep + ": resource not found in ResourceRegistrationEntity table in the DB"; + logger.error(error_msg); + } + + synchronized(IMFLUSHLOCK){ + et.commit(); + } + } catch (Exception e) { + error_msg = dep + ": ResourceRegistrationEntity DB read failed with exception: " + e; + logger.error(error_msg); + } + + + if (jmxUrl != null) { + JmxAgentConnection jmxAgentConnection = null; + try { + jmxAgentConnection = new JmxAgentConnection(jmxUrl); + MBeanServerConnection mbeanServer = jmxAgentConnection.getMBeanConnection(); + ComponentAdminMBean admin = JMX.newMXBeanProxy(mbeanServer, ComponentAdmin.getObjectName(dep), + ComponentAdminMBean.class); + + // invoke the test method via the jmx proxy + admin.test(); + logger.debug("Dependent resource " + dep + " sanity test passed"); + } catch (Exception e) { + error_msg = dep + ": resource sanity test failed with exception: " + e; + logger.error(error_msg); + // TODO: extract real error message from exception which may be nested + } finally { + // close the JMX connector + if (jmxAgentConnection != null) { + jmxAgentConnection.disconnect(); + } + } + } + + return error_msg; + } + + private String dependencyCheck() { + logger.debug("dependencyCheck: entry - checking health of dependent groups and setting resource's state"); + synchronized(dependencyCheckLock){ + + // Start with the error message empty + String error_msg = ""; + boolean dependencyFailure = false; + + + // Check the sanity of dependents for lead subcomponents + if (dep_groups != null && dep_groups.length > 0) { + // check state of resources in dependency groups + for (String group : dep_groups) { + group = group.trim(); + if (group.isEmpty()) { + // ignore empty group + continue; + } + String [] dependencies = group.split(","); + logger.debug("group dependencies = " + Arrays.toString(dependencies)); + int real_dep_count = 0; + int fail_dep_count = 0; + for (String dep : dependencies) { + dep = dep.trim(); + if (dep.isEmpty()) { + // ignore empty dependency + continue; + } + real_dep_count++; // this is a valid dependency whose state is tracked + String fail_msg = fpCheck(dep); // if a resource is down, its FP count will not be incremented + if (fail_msg == null) { + if (testViaJmx) { + fail_msg = jmxCheck(dep); + } else { + fail_msg = stateCheck(dep); + } + } + if (fail_msg != null) { + fail_dep_count++; + if (!error_msg.isEmpty()) { + error_msg = error_msg.concat(", "); + } + error_msg = error_msg.concat(fail_msg); + } + }// end for (String dep : dependencies) + + // if all dependencies in a group are failed, set this resource's state to disable dependency + if ((real_dep_count > 0) && (fail_dep_count == real_dep_count)) { + dependencyFailure=true; + try { + logger.info("All dependents in group " + group + " have failed their health check. Updating this resource's state to disableDependency"); + if( !( (stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY) || + (stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY_FAILED) ) ){ + // Note: redundant calls are made by refreshStateAudit + this.stateManager.disableDependency(); + }else //corruption has occurred - This will not be corrected by the refreshStateAudit + if(!(stateManager.getOpState()).equals(StateManagement.DISABLED)){ + // Note: redundant calls are made by refreshStateAudit + this.stateManager.disableDependency(); + } + } catch (Exception e) { + if (!error_msg.isEmpty()) { + error_msg = error_msg.concat(","); + } + error_msg = error_msg.concat(resourceName + ": Failed to disable dependency"); + break; // break out on failure and skip checking other groups + } + } + //check the next group + + }//end for (String group : dep_groups) + + /* + * We have checked all the dependency groups. If all are ok, dependencyFailure == false + */ + if(!dependencyFailure){ + try { + logger.debug("All dependency groups have at least one viable member. Updating this resource's state to enableNoDependency"); + if( ( (stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY) || + (stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY_FAILED) ) ){ + // Note: redundant calls are made by refreshStateAudit + this.stateManager.enableNoDependency(); + } // The refreshStateAudit will catch the case where it is disabled but availStatus != failed + } catch (Exception e) { + if (!error_msg.isEmpty()) { + error_msg = error_msg.concat(","); + } + error_msg = error_msg.concat(resourceName + ": Failed to enable no dependency"); + } + } + }else{ + /* + * This is put here to clean up when no dependency group should exist, but one was erroneously + * added which caused the state to be disabled/dependency/coldstandby and later removed. We saw + * this happen in the lab, but is not very likely in a production environment...but you never know. + */ + try { + logger.debug("There are no dependents. Updating this resource's state to enableNoDependency"); + if( ( (stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY) || + (stateManager.getAvailStatus()).equals(StateManagement.DEPENDENCY_FAILED) ) ){ + // Note: redundant calls are made by refreshStateAudit + this.stateManager.enableNoDependency(); + }// The refreshStateAudit will catch the case where it is disabled but availStatus != failed + } catch (Exception e) { + if (!error_msg.isEmpty()) { + error_msg = error_msg.concat(","); + } + error_msg = error_msg.concat(resourceName + ": Failed to enable no dependency"); + } + } + + /* + * We have checked dependency groups and if there were none, we set enableNoDependency. If there were some + * but they are all ok, we set enableNoDependency. So, the recovery from a disabled dependency state + * is handled above. We only need to set disableDependency if the subsystemTest fails. + */ + try { + //Test any subsystems that are not covered under the dependency relationship + subsystemTest(); + }catch (Exception e){ + //This indicates a subsystemTest failure + try { + logger.info(resourceName + ": There has been a subsystemTest failure with error: " + e.getMessage() + " Updating this resource's state to disableDependency"); + //Capture the subsystemTest failure info + if(!error_msg.isEmpty()){ + error_msg = error_msg.concat(","); + } + error_msg = error_msg.concat(resourceName + ": " + e.getMessage()); + this.stateManager.disableDependency(); + } catch (Exception ex) { + if (!error_msg.isEmpty()) { + error_msg = error_msg.concat(","); + } + error_msg = error_msg.concat("\n" + resourceName + ": Failed to disable dependency after subsystemTest failure due to: " + ex.getMessage()); + } + } + + if (!error_msg.isEmpty()) { + logger.error("Sanity failure detected in a dependent resource: " + error_msg); + + } + + dependencyCheckErrorMsg = error_msg; + lastDependencyCheckTime = System.currentTimeMillis(); + return error_msg; + } + } + + /** + * Execute a test transaction. It is called when the test transaction timer fires. + * It could be overridden to provide additional test functionality. If overridden, + * the overriding method must invoke startTransaction() and endTransaction() + */ + public void testTransaction() { + synchronized (testTransactionLock){ + logger.debug("testTransaction called..."); + // start Transaction - resets transaction timer and check admin state + try { + startTransaction(); + } catch (AdministrativeStateException e) { + // ignore + } catch (StandbyStatusException e) { + // ignore + } + + // TODO: add test functionality if needed + + // end transaction - increments local FP counter + endTransaction(); + } + } + + /** + * Additional testing for subsystems that do not have a /test interface (for ex. 3rd party + * processes like elk). This method would be overridden by the subsystem. + */ + public void subsystemTest() throws Exception { + // Testing provided by subsystem + logger.debug("IntegrityMonitor subsystemTest() OK"); + } + + /** + * Checks admin state and resets transaction timer. + * Called by application at the start of a transaction. + * @throws AdministrativeStateException throws admin state exception if resource is locked + * @throws StandbyStatusException + */ + public void startTransaction() throws AdministrativeStateException, StandbyStatusException { + + synchronized(startTransactionLock){ + // check admin state and throw exception if locked + if ((stateManager.getAdminState() != null) && stateManager.getAdminState().equals(StateManagement.LOCKED)) { + String msg = "Resource " + resourceName + " is administratively locked"; + // logger.debug(msg); + throw new AdministrativeStateException("IntegrityMonitor Admin State Exception: " + msg); + } + // check standby state and throw exception if locked + + if ((stateManager.getStandbyStatus() != null) && + (stateManager.getStandbyStatus().equals(StateManagement.HOT_STANDBY) || + stateManager.getStandbyStatus().equals(StateManagement.COLD_STANDBY))){ + String msg = "Resource " + resourceName + " is standby"; + //logger.debug(msg); + throw new StandbyStatusException("IntegrityMonitor Standby Status Exception: " + msg); + } + + // reset transactionTimer so it will not fire + elapsedTestTransTime = 0; + } + } + + /** + * Increment the local forward progress counter. Called by application at the + * end of each transaction (successful or not). + */ + public void endTransaction() { + synchronized(endTransactionLock){ + // increment local FPC + fpCounter++; + } + } + + // update FP count in DB with local FP count + private void writeFpc() throws Exception { + + // Start a transaction + EntityTransaction et = em.getTransaction(); + + if(!et.isActive()){ + et.begin(); + } + + try { + // query if ForwardProgress entry exists for resourceName + Query fquery = em.createQuery("Select f from ForwardProgressEntity f where f.resourceName=:rn"); + fquery.setParameter("rn", resourceName); + + @SuppressWarnings("rawtypes") + List fpList = fquery.setLockMode( + LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + ForwardProgressEntity fpx = null; + if(!fpList.isEmpty()) { + //ignores multiple results + fpx = (ForwardProgressEntity) fpList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(fpx); + logger.debug("Updating FP entry: Resource=" + resourceName + ", fpcCount=" + fpx.getFpcCount() + + ", lastUpdated=" + fpx.getLastUpdated() + ", new fpcCount=" + fpCounter); + fpx.setFpcCount(fpCounter); + em.persist(fpx); + // flush to the DB and commit + synchronized(IMFLUSHLOCK){ + em.flush(); + et.commit(); + } + } + else { + // Error - FP entry does not exist + String msg = "FP entry not found in database for resource " + resourceName; + throw new Exception(msg); + } + } catch (Exception e) { + try { + if (et.isActive()) { + et.rollback(); + } + } catch (Exception e1) { + // ignore + } + logger.error("writeFpc DB table commit failed with exception: " + e); + throw e; + } + } + + // retrieve state manager reference + public final StateManagement getStateManager() { + return this.stateManager; + } + + /** + * Read and validate properties + * @throws Exception + */ + private static void validateProperties(Properties prop) throws IntegrityMonitorPropertiesException { + + if (prop.getProperty(IntegrityMonitorProperties.DB_DRIVER)== null){ + String msg = IntegrityMonitorProperties.DB_DRIVER + " property is null"; + logger.error(msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + } + + if (prop.getProperty(IntegrityMonitorProperties.DB_URL)== null){ + String msg = IntegrityMonitorProperties.DB_URL + " property is null"; + logger.error(msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + } + + if (prop.getProperty(IntegrityMonitorProperties.DB_USER)== null){ + String msg = IntegrityMonitorProperties.DB_USER + " property is null"; + logger.error(msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + } + + if (prop.getProperty(IntegrityMonitorProperties.DB_PWD)== null){ + String msg = IntegrityMonitorProperties.DB_PWD + " property is null"; + logger.error(msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + } + + if (prop.getProperty(IntegrityMonitorProperties.FP_MONITOR_INTERVAL) != null) { + try { + monitorInterval = Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.FP_MONITOR_INTERVAL).trim()); + } catch (NumberFormatException e) { + logger.warn("Ignored invalid property: " + IntegrityMonitorProperties.FP_MONITOR_INTERVAL); + } + } + + if (prop.getProperty(IntegrityMonitorProperties.FAILED_COUNTER_THRESHOLD) != null) { + try { + failedCounterThreshold = Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.FAILED_COUNTER_THRESHOLD).trim()); + } catch (NumberFormatException e) { + logger.warn("Ignored invalid property: " + IntegrityMonitorProperties.FAILED_COUNTER_THRESHOLD); + } + } + + if (prop.getProperty(IntegrityMonitorProperties.TEST_TRANS_INTERVAL) != null) { + try { + testTransInterval = Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.TEST_TRANS_INTERVAL).trim()); + } catch (NumberFormatException e) { + logger.warn("Ignored invalid property: " + IntegrityMonitorProperties.TEST_TRANS_INTERVAL); + } + } + + if (prop.getProperty(IntegrityMonitorProperties.WRITE_FPC_INTERVAL) != null) { + try { + writeFpcInterval = Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.WRITE_FPC_INTERVAL).trim()); + } catch (NumberFormatException e) { + logger.warn("Ignored invalid property: " + IntegrityMonitorProperties.WRITE_FPC_INTERVAL); + } + } + + /*********************** + // followers are a comma separated list of resource names + if (prop.getProperty(IntegrityMonitorProperties.SS_FOLLOWERS) != null) { + try { + followers = prop.getProperty(IntegrityMonitorProperties.SS_FOLLOWERS).split(","); + logger.debug("followers property = " + Arrays.toString(followers)); + } catch (Exception e) { + logger.warn("Ignored invalid property: " + IntegrityMonitorProperties.SS_FOLLOWERS); + } + } + **************************/ + + // dependency_groups are a semi-colon separated list of groups + // each group is a comma separated list of resource names + // For ex. dependency_groups = site_1.pap_1,site_1.pap_2 ; site_1.pdp_1, site_1.pdp_2 + if (prop.getProperty(IntegrityMonitorProperties.DEPENDENCY_GROUPS) != null) { + try { + dep_groups = prop.getProperty(IntegrityMonitorProperties.DEPENDENCY_GROUPS).split(";"); + logger.info("dependency groups property = " + Arrays.toString(dep_groups)); + } catch (Exception e) { + logger.warn("Ignored invalid property: " + IntegrityMonitorProperties.DEPENDENCY_GROUPS); + } + } + + site_name = prop.getProperty(IntegrityMonitorProperties.SITE_NAME); + if (site_name == null) { + String msg = IntegrityMonitorProperties.SITE_NAME + " property is null"; + logger.error(msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + }else{ + site_name = site_name.trim(); + } + + node_type = prop.getProperty(IntegrityMonitorProperties.NODE_TYPE); + if (node_type == null) { + String msg = IntegrityMonitorProperties.NODE_TYPE + " property is null"; + logger.error(msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + } else { + node_type = node_type.trim(); + if (!isNodeTypeEnum(node_type)) { + String msg = IntegrityMonitorProperties.NODE_TYPE + " property " + node_type + " is invalid"; + logger.error(msg); + throw new IntegrityMonitorPropertiesException("IntegrityMonitor Property Exception: " + msg); + } + } + + if (prop.getProperty(IntegrityMonitorProperties.TEST_VIA_JMX) != null) { + String jmx_test = prop.getProperty(IntegrityMonitorProperties.TEST_VIA_JMX).trim(); + testViaJmx = Boolean.parseBoolean(jmx_test); + } + + if (prop.getProperty(IntegrityMonitorProperties.JMX_FQDN) != null) { + jmxFqdn = prop.getProperty(IntegrityMonitorProperties.JMX_FQDN).trim(); + if (jmxFqdn.isEmpty()) { + jmxFqdn = null; + } + } + + + if (prop.getProperty(IntegrityMonitorProperties.MAX_FPC_UPDATE_INTERVAL) != null) { + try { + maxFpcUpdateInterval = Integer.parseInt(prop.getProperty(IntegrityMonitorProperties.MAX_FPC_UPDATE_INTERVAL).trim()); + } catch (NumberFormatException e) { + logger.warn("Ignored invalid property: " + IntegrityMonitorProperties.MAX_FPC_UPDATE_INTERVAL); + } + } + + + return; + } + + public static void updateProperties(Properties newprop) { + if (isUnitTesting) { + try { + validateProperties(newprop); + } catch (IntegrityMonitorPropertiesException e) { + // ignore + } + } + else { + logger.info("Update integrity monitor properties not allowed"); + } + } + + private static boolean isNodeTypeEnum(String nodeType) { + for (NodeType n : NodeType.values()) { + if (n.toString().equals(nodeType)) { + return true; + } + } + return false; + } + + /** + * Look for "Forward Progress" -- if the 'FPMonitor' is stalled + * for too long, the operational state is changed to 'Disabled', + * and an alarm is set. The state is restored when forward + * progress continues. + */ + private void fpMonitorCycle() { + synchronized(fpMonitorCycleLock){ + // monitoring interval checks + if (monitorInterval <= 0) { + elapsedTime = 0; + return; // monitoring is disabled + } + + elapsedTime = elapsedTime + TimeUnit.MILLISECONDS.toSeconds(CYCLE_INTERVAL_MILLIS); + if (elapsedTime < monitorInterval) { + return; // monitoring interval not reached + } + + elapsedTime = 0; // reset elapsed time + + // TODO: check if alarm exists + + try { + if (fpCounter == lastFpCounter) { + // no forward progress + missedCycles += 1; + if (missedCycles >= failedCounterThreshold && !alarmExists) { + // set op state to disabled failed + fpcError = true; + logger.info("Forward progress not detected for resource " + resourceName + ". Setting state to disable failed."); + if(!(stateManager.getOpState()).equals(StateManagement.DISABLED)){ + // Note: The refreshStateAudit will make redundant calls + stateManager.disableFailed(); + }// The refreshStateAudit will catch the case where opStat = disabled and availState ! failed/dependency.failed + // TODO: raise alarm or Nagios alert + alarmExists = true; + } + } else { + // forward progress has occurred + lastFpCounter = fpCounter; + missedCycles = 0; + fpcError = false; + // set op state to enabled + logger.debug("Forward progress detected for resource " + resourceName + ". Setting state to enable not failed."); + if(!(stateManager.getOpState()).equals(StateManagement.ENABLED)){ + // Note: The refreshStateAudit will make redundant calls + stateManager.enableNotFailed(); + }// The refreshStateAudit will catch the case where opState=enabled and availStatus != null + + // TODO: clear alarm or Nagios alert + alarmExists = false; + } + } catch (Exception e) { + // log error + logger.error("FP Monitor encountered error. ", e); + } + } + } + + /** + * Execute a test transaction when test transaction interval has elapsed. + */ + private void checkTestTransaction() { + synchronized(checkTestTransactionLock){ + + // test transaction timer checks + if (testTransInterval <= 0) { + elapsedTestTransTime = 0; + return; // test transaction is disabled + } + + elapsedTestTransTime = elapsedTestTransTime + TimeUnit.MILLISECONDS.toSeconds(CYCLE_INTERVAL_MILLIS); + if (elapsedTestTransTime < testTransInterval) { + return; // test transaction interval not reached + } + + elapsedTestTransTime = 0; // reset elapsed time + + // execute test transaction + testTransaction(); + } + } + + /** + * Updates Fpc counter in database when write Fpc interval has elapsed. + */ + private void checkWriteFpc() { + synchronized(checkWriteFpcLock){ + + // test transaction timer checks + if (writeFpcInterval <= 0) { + elapsedWriteFpcTime = 0; + return; // write Fpc is disabled + } + + elapsedWriteFpcTime = elapsedWriteFpcTime + TimeUnit.MILLISECONDS.toSeconds(CYCLE_INTERVAL_MILLIS); + if (elapsedWriteFpcTime < writeFpcInterval) { + return; // write Fpc interval not reached + } + + elapsedWriteFpcTime = 0; // reset elapsed time + + // write Fpc to database + try { + writeFpc(); + } catch (Exception e) { + // ignore + } + } + } + + /** + * Execute a dependency health check periodically which also updates this resource's state. + */ + private void checkDependentHealth() { + logger.debug("checkDependentHealth: entry"); + + long currTime = System.currentTimeMillis(); + logger.debug("checkDependentHealth currTime - lastDependencyCheckTime = " + (currTime - lastDependencyCheckTime)); + if ((currTime - lastDependencyCheckTime) > (1000 * IntegrityMonitorProperties.DEFAULT_TEST_INTERVAL)) { + // execute dependency check and update this resource's state + + dependencyCheck(); + } + } + + /* + * This is a simple refresh audit which is periodically run to assure that the states and status + * attributes are aligned and notifications are sent to any listeners. It is possible for state/status + * to get out of synch and notified systems to be out of synch due to database corruption (manual or + * otherwise) or because a node became isolated. + * + * When the operation (lock/unlock) is called, it will cause a re-evaluation of the state and + * send a notification to all registered observers. + */ + private void refreshStateAudit(){ + synchronized(refreshStateAuditLock){ + logger.debug("refreshStateAudit: entry"); + Date now = new Date(); + long nowMs = now.getTime(); + long lastTimeMs = refreshStateAuditLastRunDate.getTime(); + logger.debug("refreshStateAudit: ms since last run = " + (nowMs - lastTimeMs)); + + if((nowMs - lastTimeMs) > refreshStateAuditIntervalMs){ + String adminState = stateManager.getAdminState(); + logger.debug("refreshStateAudit: adminState = " + adminState); + if(adminState.equals(StateManagement.LOCKED)){ + try { + logger.debug("refreshStateAudit: calling lock()"); + stateManager.lock(); + } catch (Exception e) { + logger.error("refreshStateAudit: caught unexpected exception from stateManager.lock(): " + e ); + System.out.println(new Date() + " refreshStateAudit: caught unexpected exception " + + "from stateManager.lock()"); + e.printStackTrace(); + } + }else{//unlocked + try { + logger.debug("refreshStateAudit: calling unlock()"); + stateManager.unlock();; + } catch (Exception e) { + logger.error("refreshStateAudit: caught unexpected exception from stateManager.unlock(): " + e ); + System.out.println(new Date() + " refreshStateAudit: caught unexpected exception " + + "from stateManager.unlock()"); + e.printStackTrace(); + } + } + refreshStateAuditLastRunDate = new Date(); + logger.debug("refreshStateAudit: exit"); + } + } + } + + /** + * The following nested class periodically performs the forward progress check, + * checks dependencies and does a refresh state audit. + */ + class FPManager extends Thread { + + // Constructor - start FP manager thread + FPManager() { + // set now as the last time the refreshStateAudit ran + IntegrityMonitor.this.refreshStateAuditLastRunDate = new Date(); + // start thread + this.start(); + } + + public void run() { + logger.info("FPManager thread running"); + while (true) { + try { + Thread.sleep(CYCLE_INTERVAL_MILLIS); + } catch (InterruptedException e) { + // The 'sleep' call was interrupted + continue; + } + + try { + if(logger.isDebugEnabled()){ + logger.debug("FPManager calling fpMonitorCycle()"); + } + // check forward progress timer + IntegrityMonitor.this.fpMonitorCycle(); + + if(logger.isDebugEnabled()){ + logger.debug("FPManager calling checkTestTransaction()"); + } + // check test transaction timer + IntegrityMonitor.this.checkTestTransaction(); + + if(logger.isDebugEnabled()){ + logger.debug("FPManager calling checkWriteFpc()"); + } + // check write Fpc timer + IntegrityMonitor.this.checkWriteFpc(); + + if(logger.isDebugEnabled()){ + logger.debug("FPManager calling checkDependentHealth()"); + } + // check dependency health + IntegrityMonitor.this.checkDependentHealth(); + + if(logger.isDebugEnabled()){ + logger.debug("FPManager calling refreshStateAudit()"); + } + // check if it is time to run the refreshStateAudit + IntegrityMonitor.this.refreshStateAudit(); + + } catch (Exception e) { + logger.debug("Ignore FPManager thread processing timer(s) exception: " + e); + } + } + } + + } + +} + |