aboutsummaryrefslogtreecommitdiffstats
path: root/integrity-monitor/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'integrity-monitor/src/main/java')
-rw-r--r--integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java171
1 files changed, 97 insertions, 74 deletions
diff --git a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java
index d96aa44a..3cb708e2 100644
--- a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java
+++ b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java
@@ -31,6 +31,8 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.IntConsumer;
+import java.util.function.LongConsumer;
import java.util.function.Supplier;
import javax.management.JMX;
import javax.management.MBeanServerConnection;
@@ -102,7 +104,7 @@ public class IntegrityMonitor {
/**
* Set to {@code null} if to stop running.
*/
- private volatile Thread fpManager = null;
+ private AtomicReference<Thread> fpManager = new AtomicReference<>();
// The forward progress counter is incremented as the
// process being monitored makes forward progress
@@ -212,9 +214,8 @@ public class IntegrityMonitor {
logger.error("{}", msg);
throw new IntegrityMonitorException("IntegrityMonitor constructor exception: " + msg);
}
- instance = this;
- IntegrityMonitor.resourceName = resourceName;
+ setInstance(this, resourceName);
/*
* Validate that the properties file contains all the needed properties. Throws an
@@ -248,71 +249,14 @@ public class IntegrityMonitor {
try {
// if ForwardProgress entry exists for resourceName, update it. If
// not found, create a new entry
- Query fquery = em.createQuery(QUERY_STRING);
- fquery.setParameter("rn", resourceName);
-
- @SuppressWarnings("rawtypes")
- List fpList = fquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList();
- ForwardProgressEntity fpx = null;
- if (!fpList.isEmpty()) {
- // ignores multiple results
- fpx = (ForwardProgressEntity) fpList.get(0);
- // refresh the object from DB in case cached data was returned
- em.refresh(fpx);
- if (logger.isDebugEnabled()) {
- logger.debug("Resource {} exists and will be updated - old fpc= {}, lastUpdated= {}", resourceName,
- fpx.getFpcCount(), fpx.getLastUpdated());
- }
- fpx.setFpcCount(fpCounter);
- } else {
- // Create a forward progress object
- logger.debug("Adding resource {} to ForwardProgress table", resourceName);
- fpx = new ForwardProgressEntity();
- }
- // update/set columns in entry
- fpx.setResourceName(resourceName);
- em.persist(fpx);
- // flush to the DB
- synchronized (imFlushLock) {
- em.flush();
- }
+ createOrUpdateForwardProgress(resourceName);
// if ResourceRegistration entry exists for resourceName, update it.
// If not found, create a new entry
- Query rquery = em.createQuery("Select r from ResourceRegistrationEntity r where r.resourceName=:rn");
- rquery.setParameter("rn", resourceName);
-
- @SuppressWarnings("rawtypes")
- List rrList = rquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList();
- ResourceRegistrationEntity rrx = null;
- if (!rrList.isEmpty()) {
- // ignores multiple results
- rrx = (ResourceRegistrationEntity) rrList.get(0);
- // refresh the object from DB in case cached data was returned
- em.refresh(rrx);
- if (logger.isDebugEnabled()) {
- logger.debug("Resource {} exists and will be updated - old url= {}, createdDate={}", resourceName,
- rrx.getResourceUrl(), rrx.getCreatedDate());
- }
- rrx.setLastUpdated(MonitorTime.getInstance().getDate());
- } else {
- // register resource by adding entry to table in DB
- logger.debug("Adding resource {} to ResourceRegistration table", resourceName);
- rrx = new ResourceRegistrationEntity();
- }
- // update/set columns in entry
- rrx.setResourceName(resourceName);
- rrx.setResourceUrl(jmxUrl);
- rrx.setNodeType(nodeType);
- rrx.setSite(siteName);
- em.persist(rrx);
- // flush to the DB
- synchronized (imFlushLock) {
- et.commit();
- }
+ createOrUpdateResourceReg(resourceName, jmxUrl, et);
} catch (Exception e) {
- logger.error("IntegrityMonitor constructor DB table update failed with exception: ", e);
+ logger.error("IntegrityMonitor constructor DB table update threw an exception");
try {
if (et.isActive()) {
synchronized (imFlushLock) {
@@ -325,6 +269,85 @@ public class IntegrityMonitor {
throw e;
}
+ makeStateManager(resourceName);
+
+ // create management bean
+ makeManagementBean(resourceName);
+
+ // set now as the last time the refreshStateAudit ran
+ IntegrityMonitor.this.refreshStateAuditLastRunDate = MonitorTime.getInstance().getDate();
+
+ fpManager.set(new Thread(this::runFpManager));
+ fpManager.get().start();
+
+ }
+
+ protected void createOrUpdateForwardProgress(String resourceName) {
+ Query fquery = em.createQuery(QUERY_STRING);
+ fquery.setParameter("rn", resourceName);
+
+ @SuppressWarnings("rawtypes")
+ List fpList = fquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList();
+ ForwardProgressEntity fpx = null;
+ if (!fpList.isEmpty()) {
+ // ignores multiple results
+ fpx = (ForwardProgressEntity) fpList.get(0);
+ // refresh the object from DB in case cached data was returned
+ em.refresh(fpx);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Resource {} exists and will be updated - old fpc= {}, lastUpdated= {}", resourceName,
+ fpx.getFpcCount(), fpx.getLastUpdated());
+ }
+ fpx.setFpcCount(fpCounter);
+ } else {
+ // Create a forward progress object
+ logger.debug("Adding resource {} to ForwardProgress table", resourceName);
+ fpx = new ForwardProgressEntity();
+ }
+ // update/set columns in entry
+ fpx.setResourceName(resourceName);
+ em.persist(fpx);
+ // flush to the DB
+ synchronized (imFlushLock) {
+ em.flush();
+ }
+ }
+
+ protected void createOrUpdateResourceReg(String resourceName, String jmxUrl, EntityTransaction et) {
+ Query rquery = em.createQuery("Select r from ResourceRegistrationEntity r where r.resourceName=:rn");
+ rquery.setParameter("rn", resourceName);
+
+ @SuppressWarnings("rawtypes")
+ List rrList = rquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList();
+ ResourceRegistrationEntity rrx = null;
+ if (!rrList.isEmpty()) {
+ // ignores multiple results
+ rrx = (ResourceRegistrationEntity) rrList.get(0);
+ // refresh the object from DB in case cached data was returned
+ em.refresh(rrx);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Resource {} exists and will be updated - old url= {}, createdDate={}", resourceName,
+ rrx.getResourceUrl(), rrx.getCreatedDate());
+ }
+ rrx.setLastUpdated(MonitorTime.getInstance().getDate());
+ } else {
+ // register resource by adding entry to table in DB
+ logger.debug("Adding resource {} to ResourceRegistration table", resourceName);
+ rrx = new ResourceRegistrationEntity();
+ }
+ // update/set columns in entry
+ rrx.setResourceName(resourceName);
+ rrx.setResourceUrl(jmxUrl);
+ rrx.setNodeType(nodeType);
+ rrx.setSite(siteName);
+ em.persist(rrx);
+ // flush to the DB
+ synchronized (imFlushLock) {
+ et.commit();
+ }
+ }
+
+ protected void makeStateManager(String resourceName) throws IntegrityMonitorException {
try {
// create instance of StateManagement class and pass emf to it
stateManager = new StateManagement(emf, resourceName);
@@ -340,20 +363,19 @@ public class IntegrityMonitor {
} catch (StateManagementException e) {
throw new IntegrityMonitorException(e);
}
+ }
- // create management bean
+ protected void makeManagementBean(String resourceName) {
try {
new ComponentAdmin(resourceName, this, stateManager);
} catch (Exception e) {
logger.error("ComponentAdmin constructor exception: {}", e.toString(), e);
}
+ }
- // set now as the last time the refreshStateAudit ran
- IntegrityMonitor.this.refreshStateAuditLastRunDate = MonitorTime.getInstance().getDate();
-
- fpManager = new Thread(this::runFpManager);
- fpManager.start();
-
+ private static void setInstance(IntegrityMonitor newInstance, String newResourceName) {
+ instance = newInstance;
+ resourceName = newResourceName;
}
/**
@@ -377,7 +399,8 @@ public class IntegrityMonitor {
if (instance == null) {
logger.debug("Creating new instance of IntegrityMonitor");
- instance = new IntegrityMonitor(resourceName, properties);
+ // note: new() will populate "instance"
+ new IntegrityMonitor(resourceName, properties);
}
return instance;
}
@@ -409,7 +432,7 @@ public class IntegrityMonitor {
synchronized (getInstanceLock) {
if (isUnitTesting() && instance != null && instance.fpManager != null) {
// Stop the FPManager thread
- Thread fpm = instance.fpManager;
+ Thread fpm = instance.fpManager.get();
instance.fpManager = null;
fpm.interrupt();
@@ -1296,7 +1319,7 @@ public class IntegrityMonitor {
return propValue.trim();
}
- private static void setInt(Properties props, String propName, Consumer<Integer> setter) {
+ private static void setInt(Properties props, String propName, IntConsumer setter) {
String propValue = props.getProperty(propName);
if (StringUtils.isBlank(propValue)) {
return;
@@ -1309,7 +1332,7 @@ public class IntegrityMonitor {
}
}
- private static void setLong(Properties props, String propName, Consumer<Long> setter) {
+ private static void setLong(Properties props, String propName, LongConsumer setter) {
String propValue = props.getProperty(propName);
if (StringUtils.isBlank(propValue)) {
return;