aboutsummaryrefslogtreecommitdiffstats
path: root/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java')
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java701
1 files changed, 288 insertions, 413 deletions
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
index 36d8ca59..32e3f674 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
@@ -24,6 +24,7 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME;
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT;
+import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.Gson;
@@ -31,6 +32,11 @@ import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import lombok.Getter;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
@@ -43,6 +49,7 @@ import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInst
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.onap.policy.common.gson.annotation.GsonJsonProperty;
+import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.core.PolicyContainer;
@@ -89,11 +96,13 @@ class PolicyEngineManager implements PolicyEngine {
/**
* Is the Policy Engine running.
*/
+ @Getter
private volatile boolean alive = false;
/**
* Is the engine locked.
*/
+ @Getter
private volatile boolean locked = false;
/**
@@ -109,16 +118,19 @@ class PolicyEngineManager implements PolicyEngine {
/**
* Policy Engine Sources.
*/
- private List<? extends TopicSource> sources = new ArrayList<>();
+ @Getter
+ private List<TopicSource> sources = new ArrayList<>();
/**
* Policy Engine Sinks.
*/
- private List<? extends TopicSink> sinks = new ArrayList<>();
+ @Getter
+ private List<TopicSink> sinks = new ArrayList<>();
/**
* Policy Engine HTTP Servers.
*/
+ @Getter
private List<HttpServletServer> httpServers = new ArrayList<>();
/**
@@ -130,15 +142,11 @@ class PolicyEngineManager implements PolicyEngine {
@Override
public synchronized void boot(String[] cliArgs) {
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeBoot(this, cliArgs)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-boot failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeBoot(this, cliArgs),
+ (feature, ex) -> logger.error("{}: feature {} before-boot failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
try {
@@ -147,16 +155,10 @@ class PolicyEngineManager implements PolicyEngine {
logger.error("{}: cannot init policy-container because of {}", this, e.getMessage(), e);
}
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterBoot(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-boot failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterBoot(this),
+ (feature, ex) -> logger.error("{}: feature {} after-boot failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -220,15 +222,11 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch pre configure hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeConfigure(this, properties)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-configure failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeConfigure(this, properties),
+ (feature, ex) -> logger.error("{}: feature {} before-configure failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.properties = properties;
@@ -260,16 +258,10 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch post configure hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterConfigure(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-configure failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterConfigure(this),
+ (feature, ex) -> logger.error("{}: feature {} after-configure failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -337,16 +329,11 @@ class PolicyEngineManager implements PolicyEngine {
}
// feature hook
- for (final PolicyControllerFeatureApi controllerFeature : getControllerProviders()) {
- try {
- if (controllerFeature.afterCreate(controller)) {
- return controller;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-controller-create failure because of {}", this,
- controllerFeature.getClass().getName(), e.getMessage(), e);
- }
- }
+ PolicyController controller2 = controller;
+ FeatureApiUtils.apply(getControllerProviders(),
+ feature -> feature.afterCreate(controller2),
+ (feature, ex) -> logger.error("{}: feature {} after-controller-create failure because of {}",
+ this, feature.getClass().getName(), ex.getMessage(), ex));
return controller;
}
@@ -393,7 +380,6 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException("No controller configuration has been provided");
}
- PolicyController policyController = null;
try {
final String operation = configController.getOperation();
if (operation == null || operation.isEmpty()) {
@@ -401,75 +387,14 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException("operation must be provided");
}
- try {
- policyController = getControllerFactory().get(controllerName);
- } catch (final IllegalArgumentException e) {
- // not found
- logger.warn("Policy Controller " + controllerName + " not found", e);
- }
-
+ PolicyController policyController = getController(controllerName);
if (policyController == null) {
-
- if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK)
- || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
- throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
- }
-
- /* Recovery case */
-
- logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName);
-
- final Properties controllerProperties =
- getPersistenceManager().getControllerProperties(controllerName);
-
- /*
- * returned properties cannot be null (per implementation) assert (properties !=
- * null)
- */
-
- if (controllerProperties == null) {
- throw new IllegalArgumentException(controllerName + " is invalid");
- }
-
- logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless",
- controllerName);
-
- /*
- * try to bring up bad controller in brainless mode, after having it
- * working, apply the new create/update operation.
- */
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID,
- DroolsControllerConstants.NO_GROUP_ID);
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
- DroolsControllerConstants.NO_ARTIFACT_ID);
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION,
- DroolsControllerConstants.NO_VERSION);
-
- policyController = getPolicyEngine().createPolicyController(controllerName, controllerProperties);
+ policyController = findController(controllerName, operation);
/* fall through to do brain update operation */
}
- switch (operation) {
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
- getControllerFactory().patch(policyController, configController.getDrools());
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
- policyController.unlock();
- getControllerFactory().patch(policyController, configController.getDrools());
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
- policyController.lock();
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
- policyController.unlock();
- break;
- default:
- final String msg = "Controller Operation Configuration is not supported: " + operation + " for "
- + controllerName;
- logger.warn(msg);
- throw new IllegalArgumentException(msg);
- }
+ updateController(controllerName, policyController, operation, configController);
return policyController;
} catch (final Exception e) {
@@ -481,84 +406,135 @@ class PolicyEngineManager implements PolicyEngine {
}
}
+ private PolicyController getController(final String controllerName) {
+ PolicyController policyController = null;
+ try {
+ policyController = getControllerFactory().get(controllerName);
+ } catch (final IllegalArgumentException e) {
+ // not found
+ logger.warn("Policy Controller " + controllerName + " not found", e);
+ }
+ return policyController;
+ }
+
+ private PolicyController findController(final String controllerName, final String operation) {
+ if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK)
+ || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
+ throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
+ }
+
+ /* Recovery case */
+
+ logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName);
+
+ final Properties controllerProperties =
+ getPersistenceManager().getControllerProperties(controllerName);
+
+ /*
+ * returned properties cannot be null (per implementation) assert (properties !=
+ * null)
+ */
+
+ if (controllerProperties == null) {
+ throw new IllegalArgumentException(controllerName + " is invalid");
+ }
+
+ logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless",
+ controllerName);
+
+ /*
+ * try to bring up bad controller in brainless mode, after having it
+ * working, apply the new create/update operation.
+ */
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID,
+ DroolsControllerConstants.NO_GROUP_ID);
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
+ DroolsControllerConstants.NO_ARTIFACT_ID);
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION,
+ DroolsControllerConstants.NO_VERSION);
+
+ return getPolicyEngine().createPolicyController(controllerName, controllerProperties);
+ }
+
+ private void updateController(final String controllerName, PolicyController policyController,
+ final String operation, ControllerConfiguration configController) {
+ switch (operation) {
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
+ getControllerFactory().patch(policyController, configController.getDrools());
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
+ policyController.unlock();
+ getControllerFactory().patch(policyController, configController.getDrools());
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
+ policyController.lock();
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
+ policyController.unlock();
+ break;
+ default:
+ final String msg = "Controller Operation Configuration is not supported: " + operation + " for "
+ + controllerName;
+ logger.warn(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
@Override
public synchronized boolean start() {
/* policy-engine dispatch pre start hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeStart(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeStart(this),
+ (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
- boolean success = true;
if (this.locked) {
throw new IllegalStateException(ENGINE_LOCKED_MSG);
}
this.alive = true;
+ AtomicReference<Boolean> success = new AtomicReference<>(true);
+
/* Start Policy Engine exclusively-owned (unmanaged) http servers */
- for (final HttpServletServer httpServer : this.httpServers) {
- try {
- if (!httpServer.waitedStart(10 * 1000L)) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e);
- }
- }
+ attempt(success, this.httpServers,
+ httpServer -> httpServer.waitedStart(10 * 1000L),
+ (item, ex) -> logger.error("{}: cannot start http-server {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Engine exclusively-owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- if (!source.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ attempt(success, this.sources,
+ TopicSource::start,
+ (item, ex) -> logger.error("{}: cannot start topic-source {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- if (!sink.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ attempt(success, this.sinks,
+ TopicSink::start,
+ (item, ex) -> logger.error("{}: cannot start topic-sink {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Controllers */
- final List<PolicyController> controllers = getControllerFactory().inventory();
- for (final PolicyController controller : controllers) {
- try {
- if (!controller.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start policy-controller {} because of {}", this, controller, e.getMessage(),
- e);
- success = false;
- }
- }
+ attempt(success, getControllerFactory().inventory(),
+ PolicyController::start,
+ (item, ex) -> {
+ logger.error("{}: cannot start policy-controller {} because of {}", this, item,
+ ex.getMessage(), ex);
+ success.set(false);
+ });
/* Start managed Topic Endpoints */
try {
if (!getTopicEndpointManager().start()) {
- success = false;
+ success.set(false);
}
} catch (final IllegalStateException e) {
logger.warn("{}: Topic Endpoint Manager is in an invalid state because of {}", this, e.getMessage(), e);
@@ -570,109 +546,80 @@ class PolicyEngineManager implements PolicyEngine {
startPdpJmxListener();
/* policy-engine dispatch after start hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterStart(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterStart(this),
+ (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
- return success;
+ return success.get();
+ }
+
+ @FunctionalInterface
+ private static interface PredicateWithEx<T> {
+ public boolean test(T value) throws InterruptedException;
}
@Override
public synchronized boolean stop() {
/* policy-engine dispatch pre stop hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeStop(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeStop(this),
+ (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
/* stop regardless of the lock state */
- boolean success = true;
if (!this.alive) {
return true;
}
this.alive = false;
- final List<PolicyController> controllers = getControllerFactory().inventory();
- for (final PolicyController controller : controllers) {
- try {
- if (!controller.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot stop policy-controller {} because of {}", this, controller, e.getMessage(), e);
- success = false;
- }
- }
+ AtomicReference<Boolean> success = new AtomicReference<>(true);
+
+ attempt(success, getControllerFactory().inventory(),
+ PolicyController::stop,
+ (item, ex) -> {
+ logger.error("{}: cannot stop policy-controller {} because of {}", this, item,
+ ex.getMessage(), ex);
+ success.set(false);
+ });
/* Stop Policy Engine owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- if (!source.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ attempt(success, this.sources,
+ TopicSource::stop,
+ (item, ex) -> logger.error("{}: cannot stop topic-source {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Stop Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- if (!sink.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ attempt(success, this.sinks,
+ TopicSink::stop,
+ (item, ex) -> logger.error("{}: cannot stop topic-sink {} because of {}", this, item,
+ ex.getMessage(), ex));
/* stop all managed topics sources and sinks */
if (!getTopicEndpointManager().stop()) {
- success = false;
+ success.set(false);
}
/* stop all unmanaged http servers */
- for (final HttpServletServer httpServer : this.httpServers) {
- try {
- if (!httpServer.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e);
- }
- }
+ attempt(success, this.httpServers,
+ HttpServletServer::stop,
+ (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item,
+ ex.getMessage(), ex));
// stop JMX?
/* policy-engine dispatch pre stop hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterStop(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterStop(this),
+ (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
- return success;
+ return success.get();
}
@Override
@@ -687,36 +634,26 @@ class PolicyEngineManager implements PolicyEngine {
exitThread.start();
/* policy-engine dispatch pre shutdown hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeShutdown(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.alive = false;
/* Shutdown Policy Engine owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- source.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ applyAll(this.sources,
+ TopicSource::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown topic-source {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Shutdown Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- sink.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ applyAll(this.sinks,
+ TopicSink::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown topic-sink {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Shutdown managed resources */
getControllerFactory().shutdown();
@@ -728,19 +665,45 @@ class PolicyEngineManager implements PolicyEngine {
stopPdpJmxListener();
/* policy-engine dispatch post shutdown hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
+
+ exitThread.interrupt();
+ logger.info("{}: normal termination", this);
+ }
+
+ private <T> void attempt(AtomicReference<Boolean> success, List<T> items, PredicateWithEx<T> pred,
+ BiConsumer<T,Exception> handleEx) {
+
+ for (T item : items) {
try {
- if (feature.afterShutdown(this)) {
- return;
+ if (!pred.test(item)) {
+ success.set(false);
}
- } catch (final Exception e) {
- logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
+
+ } catch (InterruptedException ex) {
+ handleEx.accept(item, ex);
+ Thread.currentThread().interrupt();
+
+ } catch (RuntimeException ex) {
+ handleEx.accept(item, ex);
}
}
+ }
- exitThread.interrupt();
- logger.info("{}: normal termination", this);
+ private <T> void applyAll(List<T> items, Consumer<T> function,
+ BiConsumer<T,Exception> handleEx) {
+
+ for (T item : items) {
+ try {
+ function.accept(item);
+
+ } catch (RuntimeException ex) {
+ handleEx.accept(item, ex);
+ }
+ }
}
/**
@@ -764,14 +727,10 @@ class PolicyEngineManager implements PolicyEngine {
/*
* shut down the Policy Engine owned http servers as the very last thing
*/
- for (final HttpServletServer httpServer : PolicyEngineManager.this.getHttpServers()) {
- try {
- httpServer.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown http-server {} because of {}", PolicyEngineManager.this,
- httpServer, e.getMessage(), e);
- }
- }
+ applyAll(PolicyEngineManager.this.getHttpServers(),
+ HttpServletServer::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown http-server {} because of {}", this, item,
+ ex.getMessage(), ex));
logger.info("{}: exit", PolicyEngineManager.this);
doExit(0);
@@ -790,23 +749,14 @@ class PolicyEngineManager implements PolicyEngine {
}
@Override
- public boolean isAlive() {
- return this.alive;
- }
-
- @Override
public synchronized boolean lock() {
/* policy-engine dispatch pre lock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeLock(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeLock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (this.locked) {
@@ -829,16 +779,10 @@ class PolicyEngineManager implements PolicyEngine {
success = getTopicEndpointManager().lock() && success;
/* policy-engine dispatch post lock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterLock(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterLock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -847,15 +791,11 @@ class PolicyEngineManager implements PolicyEngine {
public synchronized boolean unlock() {
/* policy-engine dispatch pre unlock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeUnlock(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (!this.locked) {
@@ -879,26 +819,15 @@ class PolicyEngineManager implements PolicyEngine {
success = getTopicEndpointManager().unlock() && success;
/* policy-engine dispatch after unlock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterUnlock(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@Override
- public boolean isLocked() {
- return this.locked;
- }
-
- @Override
public void removePolicyController(String name) {
getControllerFactory().destroy(name);
}
@@ -933,24 +862,6 @@ class PolicyEngineManager implements PolicyEngine {
return this.properties;
}
-
- @SuppressWarnings("unchecked")
- @Override
- public List<TopicSource> getSources() {
- return (List<TopicSource>) this.sources;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public List<TopicSink> getSinks() {
- return (List<TopicSink>) this.sinks;
- }
-
- @Override
- public List<HttpServletServer> getHttpServers() {
- return this.httpServers;
- }
-
@Override
public List<String> getFeatures() {
final List<String> features = new ArrayList<>();
@@ -985,15 +896,12 @@ class PolicyEngineManager implements PolicyEngine {
@Override
public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
/* policy-engine pre topic event hook */
- for (final PolicyEngineFeatureApi feature : getFeatureProviders()) {
- try {
- if (feature.beforeOnTopicEvent(this, commType, topic, event)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this,
- feature.getClass().getName(), event, e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getFeatureProviders(),
+ feature -> feature.beforeOnTopicEvent(this, commType, topic, event),
+ (feature, ex) -> logger.error(
+ "{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this,
+ feature.getClass().getName(), event, ex.getMessage(), ex))) {
+ return;
}
/* configuration request */
@@ -1006,16 +914,11 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine after topic event hook */
- for (final PolicyEngineFeatureApi feature : getFeatureProviders()) {
- try {
- if (feature.afterOnTopicEvent(this, configuration, commType, topic, event)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this,
- feature.getClass().getName(), event, e.getMessage(), e);
- }
- }
+ PdpdConfiguration configuration2 = configuration;
+ FeatureApiUtils.apply(getFeatureProviders(),
+ feature -> feature.afterOnTopicEvent(this, configuration2, commType, topic, event),
+ (feature, ex) -> logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this,
+ feature.getClass().getName(), event, ex.getMessage(), ex));
}
@Override
@@ -1041,7 +944,7 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalStateException(ENGINE_LOCKED_MSG);
}
- final List<? extends TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic);
+ final List<TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic);
if (topicSinks == null || topicSinks.size() != 1) {
throw new IllegalStateException("Cannot ensure correct delivery on topic " + topic + ": " + topicSinks);
}
@@ -1056,11 +959,11 @@ class PolicyEngineManager implements PolicyEngine {
* Note this entry point is usually from the DRL (one of the reasons busType is String.
*/
- if (busType == null || busType.isEmpty()) {
+ if (StringUtils.isBlank(busType)) {
throw new IllegalArgumentException("Invalid Communication Infrastructure");
}
- if (topic == null || topic.isEmpty()) {
+ if (StringUtils.isBlank(topic)) {
throw new IllegalArgumentException(INVALID_TOPIC_MSG);
}
@@ -1068,12 +971,8 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException(INVALID_EVENT_MSG);
}
- boolean valid = false;
- for (final Topic.CommInfrastructure comm : Topic.CommInfrastructure.values()) {
- if (comm.name().equals(busType)) {
- valid = true;
- }
- }
+ boolean valid = Stream.of(Topic.CommInfrastructure.values()).map(Enum::name)
+ .anyMatch(name -> name.equals(busType));
if (!valid) {
throw new IllegalArgumentException("Invalid Communication Infrastructure: " + busType);
@@ -1181,15 +1080,11 @@ class PolicyEngineManager implements PolicyEngine {
public synchronized void activate() {
/* policy-engine dispatch pre activate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeActivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-activate failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeActivate(this),
+ (feature, ex) -> logger.error("{}: feature {} before-activate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
// activate 'policy-management'
@@ -1209,31 +1104,21 @@ class PolicyEngineManager implements PolicyEngine {
this.unlock();
/* policy-engine dispatch post activate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterActivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-activate failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterActivate(this),
+ (feature, ex) -> logger.error("{}: feature {} after-activate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
public synchronized void deactivate() {
/* policy-engine dispatch pre deactivate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeDeactivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-deactivate failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeDeactivate(this),
+ (feature, ex) -> logger.error("{}: feature {} before-deactivate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.lock();
@@ -1248,16 +1133,10 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch post deactivate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterDeactivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-deactivate failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterDeactivate(this),
+ (feature, ex) -> logger.error("{}: feature {} after-deactivate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
private boolean controllerConfig(PdpdConfiguration config) {
@@ -1269,12 +1148,8 @@ class PolicyEngineManager implements PolicyEngine {
}
final List<PolicyController> policyControllers = this.updatePolicyControllers(config.getControllers());
- boolean success = false;
- if (!(policyControllers == null || policyControllers.isEmpty())
- && (policyControllers.size() == configControllers.size())) {
- success = true;
- }
- return success;
+ return (policyControllers != null && !policyControllers.isEmpty()
+ && policyControllers.size() == configControllers.size());
}
@Override