summaryrefslogtreecommitdiffstats
path: root/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java')
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java139
1 files changed, 136 insertions, 3 deletions
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
index 766848c6..c924fd69 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
@@ -24,7 +24,6 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME;
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT;
-import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.Gson;
@@ -32,11 +31,16 @@ import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;
+import lombok.AccessLevel;
import lombok.Getter;
+import lombok.NonNull;
+import org.apache.commons.lang.StringUtils;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
@@ -54,6 +58,9 @@ import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.core.PolicyContainer;
import org.onap.policy.drools.core.jmx.PdpJmxListener;
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
import org.onap.policy.drools.features.PolicyControllerFeatureApi;
import org.onap.policy.drools.features.PolicyControllerFeatureApiConstants;
import org.onap.policy.drools.features.PolicyEngineFeatureApi;
@@ -67,6 +74,7 @@ import org.onap.policy.drools.protocol.configuration.ControllerConfiguration;
import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
import org.onap.policy.drools.server.restful.RestManager;
import org.onap.policy.drools.server.restful.aaf.AafTelemetryAuthFilter;
+import org.onap.policy.drools.system.internal.SimpleLockManager;
import org.onap.policy.drools.utils.PropertyUtil;
import org.onap.policy.drools.utils.logging.LoggerUtil;
import org.onap.policy.drools.utils.logging.MdcTransaction;
@@ -78,7 +86,6 @@ import org.slf4j.LoggerFactory;
* Policy Engine Manager Implementation.
*/
class PolicyEngineManager implements PolicyEngine {
-
/**
* String literals.
*/
@@ -88,6 +95,9 @@ class PolicyEngineManager implements PolicyEngine {
private static final String ENGINE_STOPPED_MSG = "Policy Engine is stopped";
private static final String ENGINE_LOCKED_MSG = "Policy Engine is locked";
+ public static final String EXECUTOR_THREAD_PROP = "executor.threads";
+ protected static final int DEFAULT_EXECUTOR_THREADS = 5;
+
/**
* logger.
*/
@@ -134,6 +144,17 @@ class PolicyEngineManager implements PolicyEngine {
private List<HttpServletServer> httpServers = new ArrayList<>();
/**
+ * Thread pool used to execute background tasks.
+ */
+ private ScheduledExecutorService executorService = null;
+
+ /**
+ * Lock manager used to create locks.
+ */
+ @Getter(AccessLevel.PROTECTED)
+ private PolicyResourceLockManager lockManager = null;
+
+ /**
* gson parser to decode configuration requests.
*/
private final Gson decoder = new GsonBuilder().disableHtmlEscaping().create();
@@ -214,6 +235,53 @@ class PolicyEngineManager implements PolicyEngine {
}
@Override
+ @JsonIgnore
+ @GsonJsonIgnore
+ public ScheduledExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ private ScheduledExecutorService makeExecutorService(Properties properties) {
+ int nthreads = DEFAULT_EXECUTOR_THREADS;
+ try {
+ nthreads = Integer.valueOf(
+ properties.getProperty(EXECUTOR_THREAD_PROP, String.valueOf(DEFAULT_EXECUTOR_THREADS)));
+
+ } catch (NumberFormatException e) {
+ logger.error("invalid number for " + EXECUTOR_THREAD_PROP + " property", e);
+ }
+
+ return makeScheduledExecutor(nthreads);
+ }
+
+ private void createLockManager(Properties properties) {
+ for (PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ this.lockManager = feature.beforeCreateLockManager(this, properties);
+ if (this.lockManager != null) {
+ return;
+ }
+ } catch (RuntimeException e) {
+ logger.error("{}: feature {} before-create-lock-manager failure because of {}", this,
+ feature.getClass().getName(), e.getMessage(), e);
+ }
+ }
+
+ try {
+ this.lockManager = new SimpleLockManager(this, properties);
+ } catch (RuntimeException e) {
+ logger.error("{}: cannot create simple lock manager because of {}", this, e.getMessage(), e);
+ this.lockManager = new SimpleLockManager(this, new Properties());
+ }
+
+ /* policy-engine dispatch post operation hook */
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterCreateLockManager(this, properties, this.lockManager),
+ (feature, ex) -> logger.error("{}: feature {} after-create-lock-manager failure because of {}",
+ this, feature.getClass().getName(), ex.getMessage(), ex));
+ }
+
+ @Override
public synchronized void configure(Properties properties) {
if (properties == null) {
@@ -257,6 +325,10 @@ class PolicyEngineManager implements PolicyEngine {
logger.error("{}: add-http-servers failed", this, e);
}
+ executorService = makeExecutorService(properties);
+
+ createLockManager(properties);
+
/* policy-engine dispatch post configure hook */
FeatureApiUtils.apply(getEngineProviders(),
feature -> feature.afterConfigure(this),
@@ -499,6 +571,13 @@ class PolicyEngineManager implements PolicyEngine {
AtomicReference<Boolean> success = new AtomicReference<>(true);
+ try {
+ success.compareAndSet(true, this.lockManager.start());
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot start lock manager because of {}", this, e.getMessage(), e);
+ success.set(false);
+ }
+
/* Start Policy Engine exclusively-owned (unmanaged) http servers */
attempt(success, this.httpServers,
@@ -639,9 +718,16 @@ class PolicyEngineManager implements PolicyEngine {
(item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item,
ex.getMessage(), ex));
+ try {
+ success.compareAndSet(true, this.lockManager.stop());
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot stop lock manager because of {}", this, e.getMessage(), e);
+ success.set(false);
+ }
+
// stop JMX?
- /* policy-engine dispatch pre stop hook */
+ /* policy-engine dispatch post stop hook */
FeatureApiUtils.apply(getEngineProviders(),
feature -> feature.afterStop(this),
(feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
@@ -688,6 +774,14 @@ class PolicyEngineManager implements PolicyEngine {
getTopicEndpointManager().shutdown();
getServletFactory().destroy();
+ try {
+ this.lockManager.shutdown();
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot shutdown lock manager because of {}", this, e.getMessage(), e);
+ }
+
+ executorService.shutdownNow();
+
// Stop the JMX listener
stopPdpJmxListener();
@@ -806,6 +900,13 @@ class PolicyEngineManager implements PolicyEngine {
success = getTopicEndpointManager().lock() && success;
+ try {
+ success = (this.lockManager == null || this.lockManager.lock()) && success;
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot lock() lock manager because of {}", this, e.getMessage(), e);
+ success = false;
+ }
+
/* policy-engine dispatch post lock hook */
FeatureApiUtils.apply(getEngineProviders(),
feature -> feature.afterLock(this),
@@ -833,6 +934,14 @@ class PolicyEngineManager implements PolicyEngine {
this.locked = false;
boolean success = true;
+
+ try {
+ success = (this.lockManager == null || this.lockManager.unlock()) && success;
+ } catch (final RuntimeException e) {
+ logger.warn("{}: cannot unlock() lock manager because of {}", this, e.getMessage(), e);
+ success = false;
+ }
+
final List<PolicyController> controllers = getControllerFactory().inventory();
for (final PolicyController controller : controllers) {
try {
@@ -1167,6 +1276,21 @@ class PolicyEngineManager implements PolicyEngine {
feature.getClass().getName(), ex.getMessage(), ex));
}
+ @Override
+ public Lock createLock(@NonNull String resourceId, @NonNull String ownerKey, int holdSec,
+ @NonNull LockCallback callback, boolean waitForLock) {
+
+ if (holdSec < 0) {
+ throw new IllegalArgumentException("holdSec is negative");
+ }
+
+ if (lockManager == null) {
+ throw new IllegalStateException("lock manager has not been initialized");
+ }
+
+ return lockManager.createLock(resourceId, ownerKey, holdSec, callback, waitForLock);
+ }
+
private boolean controllerConfig(PdpdConfiguration config) {
/* only this one supported for now */
final List<ControllerConfiguration> configControllers = config.getControllers();
@@ -1234,4 +1358,13 @@ class PolicyEngineManager implements PolicyEngine {
protected PolicyEngine getPolicyEngine() {
return PolicyEngineConstants.getManager();
}
+
+ protected ScheduledExecutorService makeScheduledExecutor(int nthreads) {
+ ScheduledThreadPoolExecutor exsvc = new ScheduledThreadPoolExecutor(nthreads);
+ exsvc.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ exsvc.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ exsvc.setRemoveOnCancelPolicy(true);
+
+ return exsvc;
+ }
}