summaryrefslogtreecommitdiffstats
path: root/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java')
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java1321
1 files changed, 1321 insertions, 0 deletions
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
new file mode 100644
index 00000000..07202fb9
--- /dev/null
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
@@ -0,0 +1,1321 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.system;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.onap.policy.common.endpoints.event.comm.Topic;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.http.server.HttpServletServer;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactory;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
+import org.onap.policy.common.gson.annotation.GsonJsonProperty;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.core.PolicyContainer;
+import org.onap.policy.drools.core.jmx.PdpJmxListener;
+import org.onap.policy.drools.features.PolicyControllerFeatureApi;
+import org.onap.policy.drools.features.PolicyEngineFeatureApi;
+import org.onap.policy.drools.persistence.SystemPersistence;
+import org.onap.policy.drools.properties.DroolsProperties;
+import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
+import org.onap.policy.drools.protocol.configuration.ControllerConfiguration;
+import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
+import org.onap.policy.drools.server.restful.RestManager;
+import org.onap.policy.drools.server.restful.aaf.AafTelemetryAuthFilter;
+import org.onap.policy.drools.utils.PropertyUtil;
+import org.onap.policy.drools.utils.logging.LoggerUtil;
+import org.onap.policy.drools.utils.logging.MDCTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Policy Engine Manager Implementation.
+ */
+class PolicyEngineManager implements PolicyEngine {
+
+ /**
+ * String literals.
+ */
+ private static final String INVALID_TOPIC_MSG = "Invalid Topic";
+ private static final String INVALID_EVENT_MSG = "Invalid Event";
+
+ private static final String ENGINE_STOPPED_MSG = "Policy Engine is stopped";
+ private static final String ENGINE_LOCKED_MSG = "Policy Engine is locked";
+
+ /**
+ * logger.
+ */
+ private static final Logger logger = LoggerFactory.getLogger(PolicyEngineManager.class);
+
+ /**
+ * Is the Policy Engine running.
+ */
+ private volatile boolean alive = false;
+
+ /**
+ * Is the engine locked.
+ */
+ private volatile boolean locked = false;
+
+ /**
+ * Properties used to initialize the engine.
+ */
+ private Properties properties;
+
+ /**
+ * Environment Properties.
+ */
+ private final Properties environment = new Properties();
+
+ /**
+ * Policy Engine Sources.
+ */
+ private List<? extends TopicSource> sources = new ArrayList<>();
+
+ /**
+ * Policy Engine Sinks.
+ */
+ private List<? extends TopicSink> sinks = new ArrayList<>();
+
+ /**
+ * Policy Engine HTTP Servers.
+ */
+ private List<HttpServletServer> httpServers = new ArrayList<>();
+
+ /**
+ * gson parser to decode configuration requests.
+ */
+ private final Gson decoder = new GsonBuilder().disableHtmlEscaping().create();
+
+
+ @Override
+ public synchronized void boot(String[] cliArgs) {
+
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.beforeBoot(this, cliArgs)) {
+ return;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} before-boot failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ try {
+ globalInitContainer(cliArgs);
+ } catch (final Exception e) {
+ logger.error("{}: cannot init policy-container because of {}", this, e.getMessage(), e);
+ }
+
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.afterBoot(this)) {
+ return;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} after-boot failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void setEnvironment(Properties properties) {
+ this.environment.putAll(PropertyUtil.getInterpolatedProperties(properties));
+ }
+
+ @JsonIgnore
+ @GsonJsonIgnore
+ @Override
+ public synchronized Properties getEnvironment() {
+ return this.environment;
+ }
+
+ @Override
+ public synchronized String getEnvironmentProperty(String envKey) {
+ String value = this.environment.getProperty(envKey);
+ if (value == null) {
+ value = System.getProperty(envKey);
+ if (value == null) {
+ value = System.getenv(envKey);
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public synchronized String setEnvironmentProperty(String envKey, String envValue) {
+ return (String) this.environment.setProperty(envKey, envValue);
+ }
+
+ @Override
+ public final Properties defaultTelemetryConfig() {
+ final Properties defaultConfig = new Properties();
+
+ defaultConfig.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, "TELEMETRY");
+ defaultConfig.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
+ + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, TELEMETRY_SERVER_DEFAULT_HOST);
+ defaultConfig.put(
+ PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
+ + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX,
+ "" + Integer.toString(TELEMETRY_SERVER_DEFAULT_PORT));
+ defaultConfig.put(
+ PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
+ + PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX,
+ RestManager.class.getPackage().getName());
+ defaultConfig.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
+ + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "" + Boolean.TRUE);
+ defaultConfig.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
+ + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "" + Boolean.FALSE);
+
+ return defaultConfig;
+ }
+
+ @Override
+ public synchronized void configure(Properties properties) {
+
+ if (properties == null) {
+ logger.warn("No properties provided");
+ throw new IllegalArgumentException("No properties provided");
+ }
+
+ /* policy-engine dispatch pre configure hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.beforeConfigure(this, properties)) {
+ return;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} before-configure failure because of {}", this,
+ feature.getClass().getName(), e.getMessage(), e);
+ }
+ }
+
+ this.properties = properties;
+
+ try {
+ this.sources = getTopicEndpointManager().addTopicSources(properties);
+ for (final TopicSource source : this.sources) {
+ source.register(this);
+ }
+ } catch (final Exception e) {
+ logger.error("{}: add-sources failed", this, e);
+ }
+
+ try {
+ this.sinks = getTopicEndpointManager().addTopicSinks(properties);
+ } catch (final IllegalArgumentException e) {
+ logger.error("{}: add-sinks failed", this, e);
+ }
+
+ try {
+ this.httpServers = getServletFactory().build(properties);
+ for (HttpServletServer server : this.httpServers) {
+ if (server.isAaf()) {
+ server.addFilterClass(null, AafTelemetryAuthFilter.class.getName());
+ }
+ }
+ } catch (final IllegalArgumentException e) {
+ logger.error("{}: add-http-servers failed", this, e);
+ }
+
+ /* policy-engine dispatch post configure hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.afterConfigure(this)) {
+ return;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} after-configure failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public boolean configure(PdpdConfiguration config) {
+
+ if (config == null) {
+ throw new IllegalArgumentException("No configuration provided");
+ }
+
+ final String entity = config.getEntity();
+
+ MDCTransaction mdcTrans = MDCTransaction.newTransaction(config.getRequestId(), "brmsgw");
+ if (this.getSources().size() == 1) {
+ Topic topic = this.getSources().get(0);
+ mdcTrans.setServiceName(topic.getTopic()).setRemoteHost(topic.getServers().toString())
+ .setTargetEntity(config.getEntity());
+ }
+
+ switch (entity) {
+ case PdpdConfiguration.CONFIG_ENTITY_CONTROLLER:
+ boolean success = controllerConfig(config);
+ mdcTrans.resetSubTransaction().setStatusCode(success).transaction();
+ return success;
+ default:
+ final String msg = "Configuration Entity is not supported: " + entity;
+ mdcTrans.resetSubTransaction().setStatusCode(false).setResponseDescription(msg).flush();
+ logger.warn(LoggerUtil.TRANSACTION_LOG_MARKER_NAME, msg);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
+ @Override
+ public synchronized PolicyController createPolicyController(String name, Properties properties) {
+
+ String tempName = name;
+ // check if a PROPERTY_CONTROLLER_NAME property is present
+ // if so, override the given name
+
+ final String propertyControllerName = properties.getProperty(DroolsProperties.PROPERTY_CONTROLLER_NAME);
+ if (propertyControllerName != null && !propertyControllerName.isEmpty()) {
+ if (!propertyControllerName.equals(tempName)) {
+ throw new IllegalStateException("Proposed name (" + tempName + ") and properties name ("
+ + propertyControllerName + ") don't match");
+ }
+ tempName = propertyControllerName;
+ }
+
+ PolicyController controller;
+ for (final PolicyControllerFeatureApi controllerFeature : getControllerProviders()) {
+ try {
+ controller = controllerFeature.beforeCreate(tempName, properties);
+ if (controller != null) {
+ return controller;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} before-controller-create failure because of {}", this,
+ controllerFeature.getClass().getName(), e.getMessage(), e);
+ }
+ }
+
+ controller = getControllerFactory().build(tempName, properties);
+ if (this.isLocked()) {
+ controller.lock();
+ }
+
+ // feature hook
+ for (final PolicyControllerFeatureApi controllerFeature : getControllerProviders()) {
+ try {
+ if (controllerFeature.afterCreate(controller)) {
+ return controller;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} after-controller-create failure because of {}", this,
+ controllerFeature.getClass().getName(), e.getMessage(), e);
+ }
+ }
+
+ return controller;
+ }
+
+
+ @Override
+ public List<PolicyController> updatePolicyControllers(List<ControllerConfiguration> configControllers) {
+
+ final List<PolicyController> policyControllers = new ArrayList<>();
+ if (configControllers == null || configControllers.isEmpty()) {
+ logger.info("No controller configuration provided: {}", configControllers);
+ return policyControllers;
+ }
+
+ for (final ControllerConfiguration configController : configControllers) {
+ MDCTransaction mdcTrans = MDCTransaction.newSubTransaction(null).setTargetEntity(configController.getName())
+ .setTargetServiceName(configController.getOperation())
+ .setTargetVirtualEntity("" + configController.getDrools());
+ try {
+ final PolicyController policyController = this.updatePolicyController(configController);
+ policyControllers.add(policyController);
+ mdcTrans.setStatusCode(true).transaction();
+ } catch (final Exception e) {
+ mdcTrans.setStatusCode(false).setResponseCode(e.getClass().getName())
+ .setResponseDescription(e.getMessage()).flush();
+ logger.error(LoggerUtil.TRANSACTION_LOG_MARKER_NAME,
+ "{}: cannot update-policy-controllers because of {}", this, e.getMessage(), e);
+ }
+ }
+
+ return policyControllers;
+ }
+
+ @Override
+ public synchronized PolicyController updatePolicyController(ControllerConfiguration configController) {
+
+ if (configController == null) {
+ throw new IllegalArgumentException("No controller configuration has been provided");
+ }
+
+ final String controllerName = configController.getName();
+ if (controllerName == null || controllerName.isEmpty()) {
+ logger.warn("controller-name must be provided");
+ throw new IllegalArgumentException("No controller configuration has been provided");
+ }
+
+ PolicyController policyController = null;
+ try {
+ final String operation = configController.getOperation();
+ if (operation == null || operation.isEmpty()) {
+ logger.warn("operation must be provided");
+ throw new IllegalArgumentException("operation must be provided");
+ }
+
+ try {
+ policyController = getControllerFactory().get(controllerName);
+ } catch (final IllegalArgumentException e) {
+ // not found
+ logger.warn("Policy Controller " + controllerName + " not found", e);
+ }
+
+ if (policyController == null) {
+
+ if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK)
+ || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
+ throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
+ }
+
+ /* Recovery case */
+
+ logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName);
+
+ final Properties controllerProperties =
+ getPersistenceManager().getControllerProperties(controllerName);
+
+ /*
+ * returned properties cannot be null (per implementation) assert (properties !=
+ * null)
+ */
+
+ if (controllerProperties == null) {
+ throw new IllegalArgumentException(controllerName + " is invalid");
+ }
+
+ logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless",
+ controllerName);
+
+ /*
+ * try to bring up bad controller in brainless mode, after having it working, apply
+ * the new create/update operation.
+ */
+ controllerProperties.setProperty(DroolsProperties.RULES_GROUPID, DroolsController.NO_GROUP_ID);
+ controllerProperties.setProperty(DroolsProperties.RULES_ARTIFACTID, DroolsController.NO_ARTIFACT_ID);
+ controllerProperties.setProperty(DroolsProperties.RULES_VERSION, DroolsController.NO_VERSION);
+
+ policyController = getPolicyEngine().createPolicyController(controllerName, controllerProperties);
+
+ /* fall through to do brain update operation */
+ }
+
+ switch (operation) {
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
+ getControllerFactory().patch(policyController, configController.getDrools());
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
+ policyController.unlock();
+ getControllerFactory().patch(policyController, configController.getDrools());
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
+ policyController.lock();
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
+ policyController.unlock();
+ break;
+ default:
+ final String msg = "Controller Operation Configuration is not supported: " + operation + " for "
+ + controllerName;
+ logger.warn(msg);
+ throw new IllegalArgumentException(msg);
+ }
+
+ return policyController;
+ } catch (final Exception e) {
+ logger.error("{}: cannot update-policy-controller because of {}", this, e.getMessage(), e);
+ throw e;
+ } catch (final LinkageError e) {
+ logger.error("{}: cannot update-policy-controllers (rules) because of {}", this, e.getMessage(), e);
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public synchronized boolean start() {
+
+ /* policy-engine dispatch pre start hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.beforeStart(this)) {
+ return true;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ boolean success = true;
+ if (this.locked) {
+ throw new IllegalStateException(ENGINE_LOCKED_MSG);
+ }
+
+ this.alive = true;
+
+ /* Start Policy Engine exclusively-owned (unmanaged) http servers */
+
+ for (final HttpServletServer httpServer : this.httpServers) {
+ try {
+ if (!httpServer.waitedStart(10 * 1000L)) {
+ success = false;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e);
+ }
+ }
+
+ /* Start Policy Engine exclusively-owned (unmanaged) sources */
+
+ for (final TopicSource source : this.sources) {
+ try {
+ if (!source.start()) {
+ success = false;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e);
+ }
+ }
+
+ /* Start Policy Engine owned (unmanaged) sinks */
+
+ for (final TopicSink sink : this.sinks) {
+ try {
+ if (!sink.start()) {
+ success = false;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e);
+ }
+ }
+
+ /* Start Policy Controllers */
+
+ final List<PolicyController> controllers = getControllerFactory().inventory();
+ for (final PolicyController controller : controllers) {
+ try {
+ if (!controller.start()) {
+ success = false;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: cannot start policy-controller {} because of {}", this, controller, e.getMessage(),
+ e);
+ success = false;
+ }
+ }
+
+ /* Start managed Topic Endpoints */
+
+ try {
+ if (!getTopicEndpointManager().start()) {
+ success = false;
+ }
+ } catch (final IllegalStateException e) {
+ logger.warn("{}: Topic Endpoint Manager is in an invalid state because of {}", this, e.getMessage(), e);
+ }
+
+
+ // Start the JMX listener
+
+ startPdpJmxListener();
+
+ /* policy-engine dispatch after start hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.afterStart(this)) {
+ return success;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ return success;
+ }
+
+ @Override
+ public synchronized boolean stop() {
+
+ /* policy-engine dispatch pre stop hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.beforeStop(this)) {
+ return true;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ /* stop regardless of the lock state */
+
+ boolean success = true;
+ if (!this.alive) {
+ return true;
+ }
+
+ this.alive = false;
+
+ final List<PolicyController> controllers = getControllerFactory().inventory();
+ for (final PolicyController controller : controllers) {
+ try {
+ if (!controller.stop()) {
+ success = false;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: cannot stop policy-controller {} because of {}", this, controller, e.getMessage(), e);
+ success = false;
+ }
+ }
+
+ /* Stop Policy Engine owned (unmanaged) sources */
+ for (final TopicSource source : this.sources) {
+ try {
+ if (!source.stop()) {
+ success = false;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e);
+ }
+ }
+
+ /* Stop Policy Engine owned (unmanaged) sinks */
+ for (final TopicSink sink : this.sinks) {
+ try {
+ if (!sink.stop()) {
+ success = false;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e);
+ }
+ }
+
+ /* stop all managed topics sources and sinks */
+ if (!getTopicEndpointManager().stop()) {
+ success = false;
+ }
+
+ /* stop all unmanaged http servers */
+ for (final HttpServletServer httpServer : this.httpServers) {
+ try {
+ if (!httpServer.stop()) {
+ success = false;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e);
+ }
+ }
+
+ // stop JMX?
+
+ /* policy-engine dispatch pre stop hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.afterStop(this)) {
+ return success;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ return success;
+ }
+
+ @Override
+ public synchronized void shutdown() {
+
+ /*
+ * shutdown activity even when underlying subcomponents (features, controllers, topics, etc
+ * ..) are stuck
+ */
+
+ Thread exitThread = makeShutdownThread();
+ exitThread.start();
+
+ /* policy-engine dispatch pre shutdown hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.beforeShutdown(this)) {
+ return;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ this.alive = false;
+
+ /* Shutdown Policy Engine owned (unmanaged) sources */
+ for (final TopicSource source : this.sources) {
+ try {
+ source.shutdown();
+ } catch (final Exception e) {
+ logger.error("{}: cannot shutdown topic-source {} because of {}", this, source, e.getMessage(), e);
+ }
+ }
+
+ /* Shutdown Policy Engine owned (unmanaged) sinks */
+ for (final TopicSink sink : this.sinks) {
+ try {
+ sink.shutdown();
+ } catch (final Exception e) {
+ logger.error("{}: cannot shutdown topic-sink {} because of {}", this, sink, e.getMessage(), e);
+ }
+ }
+
+ /* Shutdown managed resources */
+ getControllerFactory().shutdown();
+ getTopicEndpointManager().shutdown();
+ getServletFactory().destroy();
+
+ // Stop the JMX listener
+
+ stopPdpJmxListener();
+
+ /* policy-engine dispatch post shutdown hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.afterShutdown(this)) {
+ return;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ exitThread.interrupt();
+ logger.info("{}: normal termination", this);
+ }
+
+ /**
+ * Thread that shuts down http servers.
+ */
+ protected class ShutdownThread extends Thread {
+ private static final long SHUTDOWN_MAX_GRACE_TIME = 30000L;
+
+ @Override
+ public void run() {
+ try {
+ doSleep(SHUTDOWN_MAX_GRACE_TIME);
+ logger.warn("{}: abnormal termination - shutdown graceful time period expiration",
+ PolicyEngineManager.this);
+ } catch (final InterruptedException e) {
+ /* courtesy to shutdown() to allow it to return */
+ synchronized (PolicyEngineManager.this) {
+ }
+ logger.info("{}: finishing a graceful shutdown ", PolicyEngineManager.this, e);
+ } finally {
+ /*
+ * shut down the Policy Engine owned http servers as the very last thing
+ */
+ for (final HttpServletServer httpServer : PolicyEngineManager.this.getHttpServers()) {
+ try {
+ httpServer.shutdown();
+ } catch (final Exception e) {
+ logger.error("{}: cannot shutdown http-server {} because of {}", PolicyEngineManager.this,
+ httpServer, e.getMessage(), e);
+ }
+ }
+
+ logger.info("{}: exit", PolicyEngineManager.this);
+ doExit(0);
+ }
+ }
+
+ // these may be overridden by junit tests
+
+ protected void doSleep(long sleepMs) throws InterruptedException {
+ Thread.sleep(sleepMs);
+ }
+
+ protected void doExit(int code) {
+ System.exit(code);
+ }
+ }
+
+ @Override
+ public boolean isAlive() {
+ return this.alive;
+ }
+
+ @Override
+ public synchronized boolean lock() {
+
+ /* policy-engine dispatch pre lock hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.beforeLock(this)) {
+ return true;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ if (this.locked) {
+ return true;
+ }
+
+ this.locked = true;
+
+ boolean success = true;
+ final List<PolicyController> controllers = getControllerFactory().inventory();
+ for (final PolicyController controller : controllers) {
+ try {
+ success = controller.lock() && success;
+ } catch (final Exception e) {
+ logger.error("{}: cannot lock policy-controller {} because of {}", this, controller, e.getMessage(), e);
+ success = false;
+ }
+ }
+
+ success = getTopicEndpointManager().lock() && success;
+
+ /* policy-engine dispatch post lock hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.afterLock(this)) {
+ return success;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ return success;
+ }
+
+ @Override
+ public synchronized boolean unlock() {
+
+ /* policy-engine dispatch pre unlock hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.beforeUnlock(this)) {
+ return true;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ if (!this.locked) {
+ return true;
+ }
+
+ this.locked = false;
+
+ boolean success = true;
+ final List<PolicyController> controllers = getControllerFactory().inventory();
+ for (final PolicyController controller : controllers) {
+ try {
+ success = controller.unlock() && success;
+ } catch (final Exception e) {
+ logger.error("{}: cannot unlock policy-controller {} because of {}", this, controller, e.getMessage(),
+ e);
+ success = false;
+ }
+ }
+
+ success = getTopicEndpointManager().unlock() && success;
+
+ /* policy-engine dispatch after unlock hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.afterUnlock(this)) {
+ return success;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ return success;
+ }
+
+ @Override
+ public boolean isLocked() {
+ return this.locked;
+ }
+
+ @Override
+ public void removePolicyController(String name) {
+ getControllerFactory().destroy(name);
+ }
+
+ @Override
+ public void removePolicyController(PolicyController controller) {
+ getControllerFactory().destroy(controller);
+ }
+
+ @JsonIgnore
+ @GsonJsonIgnore
+ @Override
+ public List<PolicyController> getPolicyControllers() {
+ return getControllerFactory().inventory();
+ }
+
+ @JsonProperty("controllers")
+ @GsonJsonProperty("controllers")
+ @Override
+ public List<String> getPolicyControllerIds() {
+ final List<String> controllerNames = new ArrayList<>();
+ for (final PolicyController controller : getControllerFactory().inventory()) {
+ controllerNames.add(controller.getName());
+ }
+ return controllerNames;
+ }
+
+ @Override
+ @JsonIgnore
+ @GsonJsonIgnore
+ public Properties getProperties() {
+ return this.properties;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<TopicSource> getSources() {
+ return (List<TopicSource>) this.sources;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<TopicSink> getSinks() {
+ return (List<TopicSink>) this.sinks;
+ }
+
+ @Override
+ public List<HttpServletServer> getHttpServers() {
+ return this.httpServers;
+ }
+
+ @Override
+ public List<String> getFeatures() {
+ final List<String> features = new ArrayList<>();
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ features.add(feature.getName());
+ }
+ return features;
+ }
+
+ @JsonIgnore
+ @GsonJsonIgnore
+ @Override
+ public List<PolicyEngineFeatureApi> getFeatureProviders() {
+ return getEngineProviders();
+ }
+
+ @Override
+ public PolicyEngineFeatureApi getFeatureProvider(String featureName) {
+ if (featureName == null || featureName.isEmpty()) {
+ throw new IllegalArgumentException("A feature name must be provided");
+ }
+
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ if (feature.getName().equals(featureName)) {
+ return feature;
+ }
+ }
+
+ throw new IllegalArgumentException("Invalid Feature Name: " + featureName);
+ }
+
+ @Override
+ public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
+ /* policy-engine pre topic event hook */
+ for (final PolicyEngineFeatureApi feature : getFeatureProviders()) {
+ try {
+ if (feature.beforeOnTopicEvent(this, commType, topic, event)) {
+ return;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this,
+ feature.getClass().getName(), event, e.getMessage(), e);
+ }
+ }
+
+ /* configuration request */
+ PdpdConfiguration configuration = null;
+ try {
+ configuration = this.decoder.fromJson(event, PdpdConfiguration.class);
+ this.configure(configuration);
+ } catch (final Exception e) {
+ logger.error("{}: configuration-error due to {} because of {}", this, event, e.getMessage(), e);
+ }
+
+ /* policy-engine after topic event hook */
+ for (final PolicyEngineFeatureApi feature : getFeatureProviders()) {
+ try {
+ if (feature.afterOnTopicEvent(this, configuration, commType, topic, event)) {
+ return;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this,
+ feature.getClass().getName(), event, e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public boolean deliver(String topic, Object event) {
+
+ /*
+ * Note this entry point is usually from the DRL
+ */
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException(INVALID_TOPIC_MSG);
+ }
+
+ if (event == null) {
+ throw new IllegalArgumentException(INVALID_EVENT_MSG);
+ }
+
+ if (!this.isAlive()) {
+ throw new IllegalStateException(ENGINE_STOPPED_MSG);
+ }
+
+ if (this.isLocked()) {
+ throw new IllegalStateException(ENGINE_LOCKED_MSG);
+ }
+
+ final List<? extends TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic);
+ if (topicSinks == null || topicSinks.size() != 1) {
+ throw new IllegalStateException("Cannot ensure correct delivery on topic " + topic + ": " + topicSinks);
+ }
+
+ return this.deliver(topicSinks.get(0).getTopicCommInfrastructure(), topic, event);
+ }
+
+ @Override
+ public boolean deliver(String busType, String topic, Object event) {
+
+ /*
+ * Note this entry point is usually from the DRL (one of the reasons busType is String.
+ */
+
+ if (busType == null || busType.isEmpty()) {
+ throw new IllegalArgumentException("Invalid Communication Infrastructure");
+ }
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException(INVALID_TOPIC_MSG);
+ }
+
+ if (event == null) {
+ throw new IllegalArgumentException(INVALID_EVENT_MSG);
+ }
+
+ boolean valid = false;
+ for (final Topic.CommInfrastructure comm : Topic.CommInfrastructure.values()) {
+ if (comm.name().equals(busType)) {
+ valid = true;
+ }
+ }
+
+ if (!valid) {
+ throw new IllegalArgumentException("Invalid Communication Infrastructure: " + busType);
+ }
+
+
+ if (!this.isAlive()) {
+ throw new IllegalStateException(ENGINE_STOPPED_MSG);
+ }
+
+ if (this.isLocked()) {
+ throw new IllegalStateException(ENGINE_LOCKED_MSG);
+ }
+
+
+ return this.deliver(Topic.CommInfrastructure.valueOf(busType), topic, event);
+ }
+
+ @Override
+ public boolean deliver(Topic.CommInfrastructure busType, String topic, Object event) {
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException(INVALID_TOPIC_MSG);
+ }
+
+ if (event == null) {
+ throw new IllegalArgumentException(INVALID_EVENT_MSG);
+ }
+
+ if (!this.isAlive()) {
+ throw new IllegalStateException(ENGINE_STOPPED_MSG);
+ }
+
+ if (this.isLocked()) {
+ throw new IllegalStateException(ENGINE_LOCKED_MSG);
+ }
+
+ /*
+ * Try to send through the controller, this is the preferred way, since it may want to apply
+ * additional processing
+ */
+ try {
+ final DroolsController droolsController = getProtocolCoder().getDroolsController(topic, event);
+ final PolicyController controller = getControllerFactory().get(droolsController);
+ if (controller != null) {
+ return controller.deliver(busType, topic, event);
+ }
+ } catch (final Exception e) {
+ logger.warn("{}: cannot find policy-controller to deliver {} over {}:{} because of {}", this, event,
+ busType, topic, e.getMessage(), e);
+
+ /* continue (try without routing through the controller) */
+ }
+
+ /*
+ * cannot route through the controller, send directly through the topic sink
+ */
+ try {
+ final String json = getProtocolCoder().encode(topic, event);
+ return this.deliver(busType, topic, json);
+
+ } catch (final Exception e) {
+ logger.warn("{}: cannot deliver {} over {}:{} because of {}", this, event, busType, topic, e.getMessage(),
+ e);
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean deliver(Topic.CommInfrastructure busType, String topic, String event) {
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException(INVALID_TOPIC_MSG);
+ }
+
+ if (event == null || event.isEmpty()) {
+ throw new IllegalArgumentException(INVALID_EVENT_MSG);
+ }
+
+ if (!this.isAlive()) {
+ throw new IllegalStateException(ENGINE_STOPPED_MSG);
+ }
+
+ if (this.isLocked()) {
+ throw new IllegalStateException(ENGINE_LOCKED_MSG);
+ }
+
+ try {
+ final TopicSink sink = getTopicEndpointManager().getTopicSink(busType, topic);
+
+ if (sink == null) {
+ throw new IllegalStateException("Inconsistent State: " + this);
+ }
+
+ return sink.send(event);
+
+ } catch (final Exception e) {
+ logger.warn("{}: cannot deliver {} over {}:{} because of {}", this, event, busType, topic, e.getMessage(),
+ e);
+ throw e;
+ }
+ }
+
+ @Override
+ public synchronized void activate() {
+
+ /* policy-engine dispatch pre activate hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.beforeActivate(this)) {
+ return;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} before-activate failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ // activate 'policy-management'
+ for (final PolicyController policyController : this.getPolicyControllers()) {
+ try {
+ policyController.unlock();
+ policyController.start();
+ } catch (final Exception e) {
+ logger.error("{}: cannot activate of policy-controller {} because of {}", this, policyController,
+ e.getMessage(), e);
+ } catch (final LinkageError e) {
+ logger.error("{}: cannot activate (rules compilation) of policy-controller {} because of {}", this,
+ policyController, e.getMessage(), e);
+ }
+ }
+
+ this.unlock();
+
+ /* policy-engine dispatch post activate hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.afterActivate(this)) {
+ return;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} after-activate failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void deactivate() {
+
+ /* policy-engine dispatch pre deactivate hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.beforeDeactivate(this)) {
+ return;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} before-deactivate failure because of {}", this,
+ feature.getClass().getName(), e.getMessage(), e);
+ }
+ }
+
+ this.lock();
+
+ for (final PolicyController policyController : this.getPolicyControllers()) {
+ try {
+ policyController.stop();
+ } catch (final Exception | LinkageError e) {
+ logger.error("{}: cannot deactivate (stop) policy-controller {} because of {}", this, policyController,
+ e.getMessage(), e);
+ }
+ }
+
+ /* policy-engine dispatch post deactivate hook */
+ for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ try {
+ if (feature.afterDeactivate(this)) {
+ return;
+ }
+ } catch (final Exception e) {
+ logger.error("{}: feature {} after-deactivate failure because of {}", this,
+ feature.getClass().getName(), e.getMessage(), e);
+ }
+ }
+ }
+
+ private boolean controllerConfig(PdpdConfiguration config) {
+ /* only this one supported for now */
+ final List<ControllerConfiguration> configControllers = config.getControllers();
+ if (configControllers == null || configControllers.isEmpty()) {
+ logger.info("No controller configuration provided: {}", config);
+ return false;
+ }
+
+ final List<PolicyController> policyControllers = this.updatePolicyControllers(config.getControllers());
+ boolean success = false;
+ if (!(policyControllers == null || policyControllers.isEmpty())
+ && (policyControllers.size() == configControllers.size())) {
+ success = true;
+ }
+ return success;
+ }
+
+ @Override
+ public String toString() {
+ return "PolicyEngineManager [alive=" + this.alive + ", locked=" + this.locked + "]";
+ }
+
+ // these methods may be overridden by junit tests
+
+ protected List<PolicyEngineFeatureApi> getEngineProviders() {
+ return PolicyEngineFeatureApi.providers.getList();
+ }
+
+ protected List<PolicyControllerFeatureApi> getControllerProviders() {
+ return PolicyControllerFeatureApi.providers.getList();
+ }
+
+ protected void globalInitContainer(String[] cliArgs) {
+ PolicyContainer.globalInit(cliArgs);
+ }
+
+ protected TopicEndpoint getTopicEndpointManager() {
+ return TopicEndpointManager.getManager();
+ }
+
+ protected HttpServletServerFactory getServletFactory() {
+ return HttpServletServerFactoryInstance.getServerFactory();
+ }
+
+ protected PolicyControllerFactory getControllerFactory() {
+ return PolicyController.factory;
+ }
+
+ protected void startPdpJmxListener() {
+ PdpJmxListener.start();
+ }
+
+ protected void stopPdpJmxListener() {
+ PdpJmxListener.stop();
+ }
+
+ protected Thread makeShutdownThread() {
+ return new ShutdownThread();
+ }
+
+ protected EventProtocolCoder getProtocolCoder() {
+ return EventProtocolCoder.manager;
+ }
+
+ protected SystemPersistence getPersistenceManager() {
+ return SystemPersistence.manager;
+ }
+
+ protected PolicyEngine getPolicyEngine() {
+ return PolicyEngine.manager;
+ }
+}