diff options
Diffstat (limited to 'policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java')
-rw-r--r-- | policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java | 139 |
1 files changed, 136 insertions, 3 deletions
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java index 766848c6..c924fd69 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java @@ -24,7 +24,6 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME; import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT; -import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.gson.Gson; @@ -32,11 +31,16 @@ import com.google.gson.GsonBuilder; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Stream; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NonNull; +import org.apache.commons.lang.StringUtils; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; @@ -54,6 +58,9 @@ import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.controller.DroolsControllerConstants; import org.onap.policy.drools.core.PolicyContainer; import org.onap.policy.drools.core.jmx.PdpJmxListener; +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.core.lock.PolicyResourceLockManager; import org.onap.policy.drools.features.PolicyControllerFeatureApi; import org.onap.policy.drools.features.PolicyControllerFeatureApiConstants; import org.onap.policy.drools.features.PolicyEngineFeatureApi; @@ -67,6 +74,7 @@ import org.onap.policy.drools.protocol.configuration.ControllerConfiguration; import org.onap.policy.drools.protocol.configuration.PdpdConfiguration; import org.onap.policy.drools.server.restful.RestManager; import org.onap.policy.drools.server.restful.aaf.AafTelemetryAuthFilter; +import org.onap.policy.drools.system.internal.SimpleLockManager; import org.onap.policy.drools.utils.PropertyUtil; import org.onap.policy.drools.utils.logging.LoggerUtil; import org.onap.policy.drools.utils.logging.MdcTransaction; @@ -78,7 +86,6 @@ import org.slf4j.LoggerFactory; * Policy Engine Manager Implementation. */ class PolicyEngineManager implements PolicyEngine { - /** * String literals. */ @@ -88,6 +95,9 @@ class PolicyEngineManager implements PolicyEngine { private static final String ENGINE_STOPPED_MSG = "Policy Engine is stopped"; private static final String ENGINE_LOCKED_MSG = "Policy Engine is locked"; + public static final String EXECUTOR_THREAD_PROP = "executor.threads"; + protected static final int DEFAULT_EXECUTOR_THREADS = 5; + /** * logger. */ @@ -134,6 +144,17 @@ class PolicyEngineManager implements PolicyEngine { private List<HttpServletServer> httpServers = new ArrayList<>(); /** + * Thread pool used to execute background tasks. + */ + private ScheduledExecutorService executorService = null; + + /** + * Lock manager used to create locks. + */ + @Getter(AccessLevel.PROTECTED) + private PolicyResourceLockManager lockManager = null; + + /** * gson parser to decode configuration requests. */ private final Gson decoder = new GsonBuilder().disableHtmlEscaping().create(); @@ -214,6 +235,53 @@ class PolicyEngineManager implements PolicyEngine { } @Override + @JsonIgnore + @GsonJsonIgnore + public ScheduledExecutorService getExecutorService() { + return executorService; + } + + private ScheduledExecutorService makeExecutorService(Properties properties) { + int nthreads = DEFAULT_EXECUTOR_THREADS; + try { + nthreads = Integer.valueOf( + properties.getProperty(EXECUTOR_THREAD_PROP, String.valueOf(DEFAULT_EXECUTOR_THREADS))); + + } catch (NumberFormatException e) { + logger.error("invalid number for " + EXECUTOR_THREAD_PROP + " property", e); + } + + return makeScheduledExecutor(nthreads); + } + + private void createLockManager(Properties properties) { + for (PolicyEngineFeatureApi feature : getEngineProviders()) { + try { + this.lockManager = feature.beforeCreateLockManager(this, properties); + if (this.lockManager != null) { + return; + } + } catch (RuntimeException e) { + logger.error("{}: feature {} before-create-lock-manager failure because of {}", this, + feature.getClass().getName(), e.getMessage(), e); + } + } + + try { + this.lockManager = new SimpleLockManager(this, properties); + } catch (RuntimeException e) { + logger.error("{}: cannot create simple lock manager because of {}", this, e.getMessage(), e); + this.lockManager = new SimpleLockManager(this, new Properties()); + } + + /* policy-engine dispatch post operation hook */ + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterCreateLockManager(this, properties, this.lockManager), + (feature, ex) -> logger.error("{}: feature {} after-create-lock-manager failure because of {}", + this, feature.getClass().getName(), ex.getMessage(), ex)); + } + + @Override public synchronized void configure(Properties properties) { if (properties == null) { @@ -257,6 +325,10 @@ class PolicyEngineManager implements PolicyEngine { logger.error("{}: add-http-servers failed", this, e); } + executorService = makeExecutorService(properties); + + createLockManager(properties); + /* policy-engine dispatch post configure hook */ FeatureApiUtils.apply(getEngineProviders(), feature -> feature.afterConfigure(this), @@ -499,6 +571,13 @@ class PolicyEngineManager implements PolicyEngine { AtomicReference<Boolean> success = new AtomicReference<>(true); + try { + success.compareAndSet(true, this.lockManager.start()); + } catch (final RuntimeException e) { + logger.warn("{}: cannot start lock manager because of {}", this, e.getMessage(), e); + success.set(false); + } + /* Start Policy Engine exclusively-owned (unmanaged) http servers */ attempt(success, this.httpServers, @@ -639,9 +718,16 @@ class PolicyEngineManager implements PolicyEngine { (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item, ex.getMessage(), ex)); + try { + success.compareAndSet(true, this.lockManager.stop()); + } catch (final RuntimeException e) { + logger.warn("{}: cannot stop lock manager because of {}", this, e.getMessage(), e); + success.set(false); + } + // stop JMX? - /* policy-engine dispatch pre stop hook */ + /* policy-engine dispatch post stop hook */ FeatureApiUtils.apply(getEngineProviders(), feature -> feature.afterStop(this), (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this, @@ -688,6 +774,14 @@ class PolicyEngineManager implements PolicyEngine { getTopicEndpointManager().shutdown(); getServletFactory().destroy(); + try { + this.lockManager.shutdown(); + } catch (final RuntimeException e) { + logger.warn("{}: cannot shutdown lock manager because of {}", this, e.getMessage(), e); + } + + executorService.shutdownNow(); + // Stop the JMX listener stopPdpJmxListener(); @@ -806,6 +900,13 @@ class PolicyEngineManager implements PolicyEngine { success = getTopicEndpointManager().lock() && success; + try { + success = (this.lockManager == null || this.lockManager.lock()) && success; + } catch (final RuntimeException e) { + logger.warn("{}: cannot lock() lock manager because of {}", this, e.getMessage(), e); + success = false; + } + /* policy-engine dispatch post lock hook */ FeatureApiUtils.apply(getEngineProviders(), feature -> feature.afterLock(this), @@ -833,6 +934,14 @@ class PolicyEngineManager implements PolicyEngine { this.locked = false; boolean success = true; + + try { + success = (this.lockManager == null || this.lockManager.unlock()) && success; + } catch (final RuntimeException e) { + logger.warn("{}: cannot unlock() lock manager because of {}", this, e.getMessage(), e); + success = false; + } + final List<PolicyController> controllers = getControllerFactory().inventory(); for (final PolicyController controller : controllers) { try { @@ -1167,6 +1276,21 @@ class PolicyEngineManager implements PolicyEngine { feature.getClass().getName(), ex.getMessage(), ex)); } + @Override + public Lock createLock(@NonNull String resourceId, @NonNull String ownerKey, int holdSec, + @NonNull LockCallback callback, boolean waitForLock) { + + if (holdSec < 0) { + throw new IllegalArgumentException("holdSec is negative"); + } + + if (lockManager == null) { + throw new IllegalStateException("lock manager has not been initialized"); + } + + return lockManager.createLock(resourceId, ownerKey, holdSec, callback, waitForLock); + } + private boolean controllerConfig(PdpdConfiguration config) { /* only this one supported for now */ final List<ControllerConfiguration> configControllers = config.getControllers(); @@ -1234,4 +1358,13 @@ class PolicyEngineManager implements PolicyEngine { protected PolicyEngine getPolicyEngine() { return PolicyEngineConstants.getManager(); } + + protected ScheduledExecutorService makeScheduledExecutor(int nthreads) { + ScheduledThreadPoolExecutor exsvc = new ScheduledThreadPoolExecutor(nthreads); + exsvc.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + exsvc.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + exsvc.setRemoveOnCancelPolicy(true); + + return exsvc; + } } |