diff options
Diffstat (limited to 'integrity-monitor/src/main')
-rw-r--r-- | integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java | 171 | ||||
-rw-r--r-- | integrity-monitor/src/main/resources/logback.xml | 4 |
2 files changed, 98 insertions, 77 deletions
diff --git a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java index d96aa44a..3cb708e2 100644 --- a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java +++ b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java @@ -31,6 +31,8 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.IntConsumer; +import java.util.function.LongConsumer; import java.util.function.Supplier; import javax.management.JMX; import javax.management.MBeanServerConnection; @@ -102,7 +104,7 @@ public class IntegrityMonitor { /** * Set to {@code null} if to stop running. */ - private volatile Thread fpManager = null; + private AtomicReference<Thread> fpManager = new AtomicReference<>(); // The forward progress counter is incremented as the // process being monitored makes forward progress @@ -212,9 +214,8 @@ public class IntegrityMonitor { logger.error("{}", msg); throw new IntegrityMonitorException("IntegrityMonitor constructor exception: " + msg); } - instance = this; - IntegrityMonitor.resourceName = resourceName; + setInstance(this, resourceName); /* * Validate that the properties file contains all the needed properties. Throws an @@ -248,71 +249,14 @@ public class IntegrityMonitor { try { // if ForwardProgress entry exists for resourceName, update it. If // not found, create a new entry - Query fquery = em.createQuery(QUERY_STRING); - fquery.setParameter("rn", resourceName); - - @SuppressWarnings("rawtypes") - List fpList = fquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); - ForwardProgressEntity fpx = null; - if (!fpList.isEmpty()) { - // ignores multiple results - fpx = (ForwardProgressEntity) fpList.get(0); - // refresh the object from DB in case cached data was returned - em.refresh(fpx); - if (logger.isDebugEnabled()) { - logger.debug("Resource {} exists and will be updated - old fpc= {}, lastUpdated= {}", resourceName, - fpx.getFpcCount(), fpx.getLastUpdated()); - } - fpx.setFpcCount(fpCounter); - } else { - // Create a forward progress object - logger.debug("Adding resource {} to ForwardProgress table", resourceName); - fpx = new ForwardProgressEntity(); - } - // update/set columns in entry - fpx.setResourceName(resourceName); - em.persist(fpx); - // flush to the DB - synchronized (imFlushLock) { - em.flush(); - } + createOrUpdateForwardProgress(resourceName); // if ResourceRegistration entry exists for resourceName, update it. // If not found, create a new entry - Query rquery = em.createQuery("Select r from ResourceRegistrationEntity r where r.resourceName=:rn"); - rquery.setParameter("rn", resourceName); - - @SuppressWarnings("rawtypes") - List rrList = rquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); - ResourceRegistrationEntity rrx = null; - if (!rrList.isEmpty()) { - // ignores multiple results - rrx = (ResourceRegistrationEntity) rrList.get(0); - // refresh the object from DB in case cached data was returned - em.refresh(rrx); - if (logger.isDebugEnabled()) { - logger.debug("Resource {} exists and will be updated - old url= {}, createdDate={}", resourceName, - rrx.getResourceUrl(), rrx.getCreatedDate()); - } - rrx.setLastUpdated(MonitorTime.getInstance().getDate()); - } else { - // register resource by adding entry to table in DB - logger.debug("Adding resource {} to ResourceRegistration table", resourceName); - rrx = new ResourceRegistrationEntity(); - } - // update/set columns in entry - rrx.setResourceName(resourceName); - rrx.setResourceUrl(jmxUrl); - rrx.setNodeType(nodeType); - rrx.setSite(siteName); - em.persist(rrx); - // flush to the DB - synchronized (imFlushLock) { - et.commit(); - } + createOrUpdateResourceReg(resourceName, jmxUrl, et); } catch (Exception e) { - logger.error("IntegrityMonitor constructor DB table update failed with exception: ", e); + logger.error("IntegrityMonitor constructor DB table update threw an exception"); try { if (et.isActive()) { synchronized (imFlushLock) { @@ -325,6 +269,85 @@ public class IntegrityMonitor { throw e; } + makeStateManager(resourceName); + + // create management bean + makeManagementBean(resourceName); + + // set now as the last time the refreshStateAudit ran + IntegrityMonitor.this.refreshStateAuditLastRunDate = MonitorTime.getInstance().getDate(); + + fpManager.set(new Thread(this::runFpManager)); + fpManager.get().start(); + + } + + protected void createOrUpdateForwardProgress(String resourceName) { + Query fquery = em.createQuery(QUERY_STRING); + fquery.setParameter("rn", resourceName); + + @SuppressWarnings("rawtypes") + List fpList = fquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + ForwardProgressEntity fpx = null; + if (!fpList.isEmpty()) { + // ignores multiple results + fpx = (ForwardProgressEntity) fpList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(fpx); + if (logger.isDebugEnabled()) { + logger.debug("Resource {} exists and will be updated - old fpc= {}, lastUpdated= {}", resourceName, + fpx.getFpcCount(), fpx.getLastUpdated()); + } + fpx.setFpcCount(fpCounter); + } else { + // Create a forward progress object + logger.debug("Adding resource {} to ForwardProgress table", resourceName); + fpx = new ForwardProgressEntity(); + } + // update/set columns in entry + fpx.setResourceName(resourceName); + em.persist(fpx); + // flush to the DB + synchronized (imFlushLock) { + em.flush(); + } + } + + protected void createOrUpdateResourceReg(String resourceName, String jmxUrl, EntityTransaction et) { + Query rquery = em.createQuery("Select r from ResourceRegistrationEntity r where r.resourceName=:rn"); + rquery.setParameter("rn", resourceName); + + @SuppressWarnings("rawtypes") + List rrList = rquery.setLockMode(LockModeType.NONE).setFlushMode(FlushModeType.COMMIT).getResultList(); + ResourceRegistrationEntity rrx = null; + if (!rrList.isEmpty()) { + // ignores multiple results + rrx = (ResourceRegistrationEntity) rrList.get(0); + // refresh the object from DB in case cached data was returned + em.refresh(rrx); + if (logger.isDebugEnabled()) { + logger.debug("Resource {} exists and will be updated - old url= {}, createdDate={}", resourceName, + rrx.getResourceUrl(), rrx.getCreatedDate()); + } + rrx.setLastUpdated(MonitorTime.getInstance().getDate()); + } else { + // register resource by adding entry to table in DB + logger.debug("Adding resource {} to ResourceRegistration table", resourceName); + rrx = new ResourceRegistrationEntity(); + } + // update/set columns in entry + rrx.setResourceName(resourceName); + rrx.setResourceUrl(jmxUrl); + rrx.setNodeType(nodeType); + rrx.setSite(siteName); + em.persist(rrx); + // flush to the DB + synchronized (imFlushLock) { + et.commit(); + } + } + + protected void makeStateManager(String resourceName) throws IntegrityMonitorException { try { // create instance of StateManagement class and pass emf to it stateManager = new StateManagement(emf, resourceName); @@ -340,20 +363,19 @@ public class IntegrityMonitor { } catch (StateManagementException e) { throw new IntegrityMonitorException(e); } + } - // create management bean + protected void makeManagementBean(String resourceName) { try { new ComponentAdmin(resourceName, this, stateManager); } catch (Exception e) { logger.error("ComponentAdmin constructor exception: {}", e.toString(), e); } + } - // set now as the last time the refreshStateAudit ran - IntegrityMonitor.this.refreshStateAuditLastRunDate = MonitorTime.getInstance().getDate(); - - fpManager = new Thread(this::runFpManager); - fpManager.start(); - + private static void setInstance(IntegrityMonitor newInstance, String newResourceName) { + instance = newInstance; + resourceName = newResourceName; } /** @@ -377,7 +399,8 @@ public class IntegrityMonitor { if (instance == null) { logger.debug("Creating new instance of IntegrityMonitor"); - instance = new IntegrityMonitor(resourceName, properties); + // note: new() will populate "instance" + new IntegrityMonitor(resourceName, properties); } return instance; } @@ -409,7 +432,7 @@ public class IntegrityMonitor { synchronized (getInstanceLock) { if (isUnitTesting() && instance != null && instance.fpManager != null) { // Stop the FPManager thread - Thread fpm = instance.fpManager; + Thread fpm = instance.fpManager.get(); instance.fpManager = null; fpm.interrupt(); @@ -1296,7 +1319,7 @@ public class IntegrityMonitor { return propValue.trim(); } - private static void setInt(Properties props, String propName, Consumer<Integer> setter) { + private static void setInt(Properties props, String propName, IntConsumer setter) { String propValue = props.getProperty(propName); if (StringUtils.isBlank(propValue)) { return; @@ -1309,7 +1332,7 @@ public class IntegrityMonitor { } } - private static void setLong(Properties props, String propName, Consumer<Long> setter) { + private static void setLong(Properties props, String propName, LongConsumer setter) { String propValue = props.getProperty(propName); if (StringUtils.isBlank(propValue)) { return; diff --git a/integrity-monitor/src/main/resources/logback.xml b/integrity-monitor/src/main/resources/logback.xml index 1d498631..c7f1b50b 100644 --- a/integrity-monitor/src/main/resources/logback.xml +++ b/integrity-monitor/src/main/resources/logback.xml @@ -2,7 +2,7 @@ ============LICENSE_START======================================================= Integrity Monitor ================================================================================ - Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. ================================================================================ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -147,8 +147,6 @@ <maxFileSize>5MB</maxFileSize> </triggeringPolicy> <encoder> - <!-- <pattern>"%d{HH:mm:ss.SSS} [%thread] %-5level %logger{1024} - - %msg%n"</pattern> --> <pattern>${defaultPattern}</pattern> </encoder> </appender> |