diff options
Diffstat (limited to 'policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java')
-rw-r--r-- | policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java | 701 |
1 files changed, 288 insertions, 413 deletions
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java index 36d8ca59..32e3f674 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java @@ -24,6 +24,7 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME; import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT; +import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.gson.Gson; @@ -31,6 +32,11 @@ import com.google.gson.GsonBuilder; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Stream; +import lombok.Getter; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; @@ -43,6 +49,7 @@ import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInst import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.onap.policy.common.gson.annotation.GsonJsonProperty; +import org.onap.policy.common.utils.services.FeatureApiUtils; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.controller.DroolsControllerConstants; import org.onap.policy.drools.core.PolicyContainer; @@ -89,11 +96,13 @@ class PolicyEngineManager implements PolicyEngine { /** * Is the Policy Engine running. */ + @Getter private volatile boolean alive = false; /** * Is the engine locked. */ + @Getter private volatile boolean locked = false; /** @@ -109,16 +118,19 @@ class PolicyEngineManager implements PolicyEngine { /** * Policy Engine Sources. */ - private List<? extends TopicSource> sources = new ArrayList<>(); + @Getter + private List<TopicSource> sources = new ArrayList<>(); /** * Policy Engine Sinks. */ - private List<? extends TopicSink> sinks = new ArrayList<>(); + @Getter + private List<TopicSink> sinks = new ArrayList<>(); /** * Policy Engine HTTP Servers. */ + @Getter private List<HttpServletServer> httpServers = new ArrayList<>(); /** @@ -130,15 +142,11 @@ class PolicyEngineManager implements PolicyEngine { @Override public synchronized void boot(String[] cliArgs) { - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeBoot(this, cliArgs)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-boot failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeBoot(this, cliArgs), + (feature, ex) -> logger.error("{}: feature {} before-boot failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } try { @@ -147,16 +155,10 @@ class PolicyEngineManager implements PolicyEngine { logger.error("{}: cannot init policy-container because of {}", this, e.getMessage(), e); } - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterBoot(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-boot failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterBoot(this), + (feature, ex) -> logger.error("{}: feature {} after-boot failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } @Override @@ -220,15 +222,11 @@ class PolicyEngineManager implements PolicyEngine { } /* policy-engine dispatch pre configure hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeConfigure(this, properties)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-configure failure because of {}", this, - feature.getClass().getName(), e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeConfigure(this, properties), + (feature, ex) -> logger.error("{}: feature {} before-configure failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } this.properties = properties; @@ -260,16 +258,10 @@ class PolicyEngineManager implements PolicyEngine { } /* policy-engine dispatch post configure hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterConfigure(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-configure failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterConfigure(this), + (feature, ex) -> logger.error("{}: feature {} after-configure failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } @Override @@ -337,16 +329,11 @@ class PolicyEngineManager implements PolicyEngine { } // feature hook - for (final PolicyControllerFeatureApi controllerFeature : getControllerProviders()) { - try { - if (controllerFeature.afterCreate(controller)) { - return controller; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-controller-create failure because of {}", this, - controllerFeature.getClass().getName(), e.getMessage(), e); - } - } + PolicyController controller2 = controller; + FeatureApiUtils.apply(getControllerProviders(), + feature -> feature.afterCreate(controller2), + (feature, ex) -> logger.error("{}: feature {} after-controller-create failure because of {}", + this, feature.getClass().getName(), ex.getMessage(), ex)); return controller; } @@ -393,7 +380,6 @@ class PolicyEngineManager implements PolicyEngine { throw new IllegalArgumentException("No controller configuration has been provided"); } - PolicyController policyController = null; try { final String operation = configController.getOperation(); if (operation == null || operation.isEmpty()) { @@ -401,75 +387,14 @@ class PolicyEngineManager implements PolicyEngine { throw new IllegalArgumentException("operation must be provided"); } - try { - policyController = getControllerFactory().get(controllerName); - } catch (final IllegalArgumentException e) { - // not found - logger.warn("Policy Controller " + controllerName + " not found", e); - } - + PolicyController policyController = getController(controllerName); if (policyController == null) { - - if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK) - || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) { - throw new IllegalArgumentException(controllerName + " is not available for operation " + operation); - } - - /* Recovery case */ - - logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName); - - final Properties controllerProperties = - getPersistenceManager().getControllerProperties(controllerName); - - /* - * returned properties cannot be null (per implementation) assert (properties != - * null) - */ - - if (controllerProperties == null) { - throw new IllegalArgumentException(controllerName + " is invalid"); - } - - logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless", - controllerName); - - /* - * try to bring up bad controller in brainless mode, after having it - * working, apply the new create/update operation. - */ - controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID, - DroolsControllerConstants.NO_GROUP_ID); - controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID, - DroolsControllerConstants.NO_ARTIFACT_ID); - controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION, - DroolsControllerConstants.NO_VERSION); - - policyController = getPolicyEngine().createPolicyController(controllerName, controllerProperties); + policyController = findController(controllerName, operation); /* fall through to do brain update operation */ } - switch (operation) { - case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE: - getControllerFactory().patch(policyController, configController.getDrools()); - break; - case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE: - policyController.unlock(); - getControllerFactory().patch(policyController, configController.getDrools()); - break; - case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK: - policyController.lock(); - break; - case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK: - policyController.unlock(); - break; - default: - final String msg = "Controller Operation Configuration is not supported: " + operation + " for " - + controllerName; - logger.warn(msg); - throw new IllegalArgumentException(msg); - } + updateController(controllerName, policyController, operation, configController); return policyController; } catch (final Exception e) { @@ -481,84 +406,135 @@ class PolicyEngineManager implements PolicyEngine { } } + private PolicyController getController(final String controllerName) { + PolicyController policyController = null; + try { + policyController = getControllerFactory().get(controllerName); + } catch (final IllegalArgumentException e) { + // not found + logger.warn("Policy Controller " + controllerName + " not found", e); + } + return policyController; + } + + private PolicyController findController(final String controllerName, final String operation) { + if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK) + || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) { + throw new IllegalArgumentException(controllerName + " is not available for operation " + operation); + } + + /* Recovery case */ + + logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName); + + final Properties controllerProperties = + getPersistenceManager().getControllerProperties(controllerName); + + /* + * returned properties cannot be null (per implementation) assert (properties != + * null) + */ + + if (controllerProperties == null) { + throw new IllegalArgumentException(controllerName + " is invalid"); + } + + logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless", + controllerName); + + /* + * try to bring up bad controller in brainless mode, after having it + * working, apply the new create/update operation. + */ + controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID, + DroolsControllerConstants.NO_GROUP_ID); + controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID, + DroolsControllerConstants.NO_ARTIFACT_ID); + controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION, + DroolsControllerConstants.NO_VERSION); + + return getPolicyEngine().createPolicyController(controllerName, controllerProperties); + } + + private void updateController(final String controllerName, PolicyController policyController, + final String operation, ControllerConfiguration configController) { + switch (operation) { + case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE: + getControllerFactory().patch(policyController, configController.getDrools()); + break; + case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE: + policyController.unlock(); + getControllerFactory().patch(policyController, configController.getDrools()); + break; + case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK: + policyController.lock(); + break; + case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK: + policyController.unlock(); + break; + default: + final String msg = "Controller Operation Configuration is not supported: " + operation + " for " + + controllerName; + logger.warn(msg); + throw new IllegalArgumentException(msg); + } + } + @Override public synchronized boolean start() { /* policy-engine dispatch pre start hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeStart(this)) { - return true; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeStart(this), + (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } - boolean success = true; if (this.locked) { throw new IllegalStateException(ENGINE_LOCKED_MSG); } this.alive = true; + AtomicReference<Boolean> success = new AtomicReference<>(true); + /* Start Policy Engine exclusively-owned (unmanaged) http servers */ - for (final HttpServletServer httpServer : this.httpServers) { - try { - if (!httpServer.waitedStart(10 * 1000L)) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e); - } - } + attempt(success, this.httpServers, + httpServer -> httpServer.waitedStart(10 * 1000L), + (item, ex) -> logger.error("{}: cannot start http-server {} because of {}", + this, item, ex.getMessage(), ex)); /* Start Policy Engine exclusively-owned (unmanaged) sources */ - for (final TopicSource source : this.sources) { - try { - if (!source.start()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e); - } - } + attempt(success, this.sources, + TopicSource::start, + (item, ex) -> logger.error("{}: cannot start topic-source {} because of {}", + this, item, ex.getMessage(), ex)); /* Start Policy Engine owned (unmanaged) sinks */ - for (final TopicSink sink : this.sinks) { - try { - if (!sink.start()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e); - } - } + attempt(success, this.sinks, + TopicSink::start, + (item, ex) -> logger.error("{}: cannot start topic-sink {} because of {}", + this, item, ex.getMessage(), ex)); /* Start Policy Controllers */ - final List<PolicyController> controllers = getControllerFactory().inventory(); - for (final PolicyController controller : controllers) { - try { - if (!controller.start()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start policy-controller {} because of {}", this, controller, e.getMessage(), - e); - success = false; - } - } + attempt(success, getControllerFactory().inventory(), + PolicyController::start, + (item, ex) -> { + logger.error("{}: cannot start policy-controller {} because of {}", this, item, + ex.getMessage(), ex); + success.set(false); + }); /* Start managed Topic Endpoints */ try { if (!getTopicEndpointManager().start()) { - success = false; + success.set(false); } } catch (final IllegalStateException e) { logger.warn("{}: Topic Endpoint Manager is in an invalid state because of {}", this, e.getMessage(), e); @@ -570,109 +546,80 @@ class PolicyEngineManager implements PolicyEngine { startPdpJmxListener(); /* policy-engine dispatch after start hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterStart(this)) { - return success; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterStart(this), + (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); - return success; + return success.get(); + } + + @FunctionalInterface + private static interface PredicateWithEx<T> { + public boolean test(T value) throws InterruptedException; } @Override public synchronized boolean stop() { /* policy-engine dispatch pre stop hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeStop(this)) { - return true; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeStop(this), + (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } /* stop regardless of the lock state */ - boolean success = true; if (!this.alive) { return true; } this.alive = false; - final List<PolicyController> controllers = getControllerFactory().inventory(); - for (final PolicyController controller : controllers) { - try { - if (!controller.stop()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot stop policy-controller {} because of {}", this, controller, e.getMessage(), e); - success = false; - } - } + AtomicReference<Boolean> success = new AtomicReference<>(true); + + attempt(success, getControllerFactory().inventory(), + PolicyController::stop, + (item, ex) -> { + logger.error("{}: cannot stop policy-controller {} because of {}", this, item, + ex.getMessage(), ex); + success.set(false); + }); /* Stop Policy Engine owned (unmanaged) sources */ - for (final TopicSource source : this.sources) { - try { - if (!source.stop()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e); - } - } + attempt(success, this.sources, + TopicSource::stop, + (item, ex) -> logger.error("{}: cannot stop topic-source {} because of {}", this, item, + ex.getMessage(), ex)); /* Stop Policy Engine owned (unmanaged) sinks */ - for (final TopicSink sink : this.sinks) { - try { - if (!sink.stop()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e); - } - } + attempt(success, this.sinks, + TopicSink::stop, + (item, ex) -> logger.error("{}: cannot stop topic-sink {} because of {}", this, item, + ex.getMessage(), ex)); /* stop all managed topics sources and sinks */ if (!getTopicEndpointManager().stop()) { - success = false; + success.set(false); } /* stop all unmanaged http servers */ - for (final HttpServletServer httpServer : this.httpServers) { - try { - if (!httpServer.stop()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e); - } - } + attempt(success, this.httpServers, + HttpServletServer::stop, + (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item, + ex.getMessage(), ex)); // stop JMX? /* policy-engine dispatch pre stop hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterStop(this)) { - return success; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterStop(this), + (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); - return success; + return success.get(); } @Override @@ -687,36 +634,26 @@ class PolicyEngineManager implements PolicyEngine { exitThread.start(); /* policy-engine dispatch pre shutdown hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeShutdown(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeShutdown(this), + (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } this.alive = false; /* Shutdown Policy Engine owned (unmanaged) sources */ - for (final TopicSource source : this.sources) { - try { - source.shutdown(); - } catch (final Exception e) { - logger.error("{}: cannot shutdown topic-source {} because of {}", this, source, e.getMessage(), e); - } - } + applyAll(this.sources, + TopicSource::shutdown, + (item, ex) -> logger.error("{}: cannot shutdown topic-source {} because of {}", this, item, + ex.getMessage(), ex)); /* Shutdown Policy Engine owned (unmanaged) sinks */ - for (final TopicSink sink : this.sinks) { - try { - sink.shutdown(); - } catch (final Exception e) { - logger.error("{}: cannot shutdown topic-sink {} because of {}", this, sink, e.getMessage(), e); - } - } + applyAll(this.sinks, + TopicSink::shutdown, + (item, ex) -> logger.error("{}: cannot shutdown topic-sink {} because of {}", this, item, + ex.getMessage(), ex)); /* Shutdown managed resources */ getControllerFactory().shutdown(); @@ -728,19 +665,45 @@ class PolicyEngineManager implements PolicyEngine { stopPdpJmxListener(); /* policy-engine dispatch post shutdown hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterShutdown(this), + (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); + + exitThread.interrupt(); + logger.info("{}: normal termination", this); + } + + private <T> void attempt(AtomicReference<Boolean> success, List<T> items, PredicateWithEx<T> pred, + BiConsumer<T,Exception> handleEx) { + + for (T item : items) { try { - if (feature.afterShutdown(this)) { - return; + if (!pred.test(item)) { + success.set(false); } - } catch (final Exception e) { - logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); + + } catch (InterruptedException ex) { + handleEx.accept(item, ex); + Thread.currentThread().interrupt(); + + } catch (RuntimeException ex) { + handleEx.accept(item, ex); } } + } - exitThread.interrupt(); - logger.info("{}: normal termination", this); + private <T> void applyAll(List<T> items, Consumer<T> function, + BiConsumer<T,Exception> handleEx) { + + for (T item : items) { + try { + function.accept(item); + + } catch (RuntimeException ex) { + handleEx.accept(item, ex); + } + } } /** @@ -764,14 +727,10 @@ class PolicyEngineManager implements PolicyEngine { /* * shut down the Policy Engine owned http servers as the very last thing */ - for (final HttpServletServer httpServer : PolicyEngineManager.this.getHttpServers()) { - try { - httpServer.shutdown(); - } catch (final Exception e) { - logger.error("{}: cannot shutdown http-server {} because of {}", PolicyEngineManager.this, - httpServer, e.getMessage(), e); - } - } + applyAll(PolicyEngineManager.this.getHttpServers(), + HttpServletServer::shutdown, + (item, ex) -> logger.error("{}: cannot shutdown http-server {} because of {}", this, item, + ex.getMessage(), ex)); logger.info("{}: exit", PolicyEngineManager.this); doExit(0); @@ -790,23 +749,14 @@ class PolicyEngineManager implements PolicyEngine { } @Override - public boolean isAlive() { - return this.alive; - } - - @Override public synchronized boolean lock() { /* policy-engine dispatch pre lock hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeLock(this)) { - return true; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeLock(this), + (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } if (this.locked) { @@ -829,16 +779,10 @@ class PolicyEngineManager implements PolicyEngine { success = getTopicEndpointManager().lock() && success; /* policy-engine dispatch post lock hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterLock(this)) { - return success; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterLock(this), + (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -847,15 +791,11 @@ class PolicyEngineManager implements PolicyEngine { public synchronized boolean unlock() { /* policy-engine dispatch pre unlock hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeUnlock(this)) { - return true; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeUnlock(this), + (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } if (!this.locked) { @@ -879,26 +819,15 @@ class PolicyEngineManager implements PolicyEngine { success = getTopicEndpointManager().unlock() && success; /* policy-engine dispatch after unlock hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterUnlock(this)) { - return success; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterUnlock(this), + (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @Override - public boolean isLocked() { - return this.locked; - } - - @Override public void removePolicyController(String name) { getControllerFactory().destroy(name); } @@ -933,24 +862,6 @@ class PolicyEngineManager implements PolicyEngine { return this.properties; } - - @SuppressWarnings("unchecked") - @Override - public List<TopicSource> getSources() { - return (List<TopicSource>) this.sources; - } - - @SuppressWarnings("unchecked") - @Override - public List<TopicSink> getSinks() { - return (List<TopicSink>) this.sinks; - } - - @Override - public List<HttpServletServer> getHttpServers() { - return this.httpServers; - } - @Override public List<String> getFeatures() { final List<String> features = new ArrayList<>(); @@ -985,15 +896,12 @@ class PolicyEngineManager implements PolicyEngine { @Override public void onTopicEvent(CommInfrastructure commType, String topic, String event) { /* policy-engine pre topic event hook */ - for (final PolicyEngineFeatureApi feature : getFeatureProviders()) { - try { - if (feature.beforeOnTopicEvent(this, commType, topic, event)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this, - feature.getClass().getName(), event, e.getMessage(), e); - } + if (FeatureApiUtils.apply(getFeatureProviders(), + feature -> feature.beforeOnTopicEvent(this, commType, topic, event), + (feature, ex) -> logger.error( + "{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this, + feature.getClass().getName(), event, ex.getMessage(), ex))) { + return; } /* configuration request */ @@ -1006,16 +914,11 @@ class PolicyEngineManager implements PolicyEngine { } /* policy-engine after topic event hook */ - for (final PolicyEngineFeatureApi feature : getFeatureProviders()) { - try { - if (feature.afterOnTopicEvent(this, configuration, commType, topic, event)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this, - feature.getClass().getName(), event, e.getMessage(), e); - } - } + PdpdConfiguration configuration2 = configuration; + FeatureApiUtils.apply(getFeatureProviders(), + feature -> feature.afterOnTopicEvent(this, configuration2, commType, topic, event), + (feature, ex) -> logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this, + feature.getClass().getName(), event, ex.getMessage(), ex)); } @Override @@ -1041,7 +944,7 @@ class PolicyEngineManager implements PolicyEngine { throw new IllegalStateException(ENGINE_LOCKED_MSG); } - final List<? extends TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic); + final List<TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic); if (topicSinks == null || topicSinks.size() != 1) { throw new IllegalStateException("Cannot ensure correct delivery on topic " + topic + ": " + topicSinks); } @@ -1056,11 +959,11 @@ class PolicyEngineManager implements PolicyEngine { * Note this entry point is usually from the DRL (one of the reasons busType is String. */ - if (busType == null || busType.isEmpty()) { + if (StringUtils.isBlank(busType)) { throw new IllegalArgumentException("Invalid Communication Infrastructure"); } - if (topic == null || topic.isEmpty()) { + if (StringUtils.isBlank(topic)) { throw new IllegalArgumentException(INVALID_TOPIC_MSG); } @@ -1068,12 +971,8 @@ class PolicyEngineManager implements PolicyEngine { throw new IllegalArgumentException(INVALID_EVENT_MSG); } - boolean valid = false; - for (final Topic.CommInfrastructure comm : Topic.CommInfrastructure.values()) { - if (comm.name().equals(busType)) { - valid = true; - } - } + boolean valid = Stream.of(Topic.CommInfrastructure.values()).map(Enum::name) + .anyMatch(name -> name.equals(busType)); if (!valid) { throw new IllegalArgumentException("Invalid Communication Infrastructure: " + busType); @@ -1181,15 +1080,11 @@ class PolicyEngineManager implements PolicyEngine { public synchronized void activate() { /* policy-engine dispatch pre activate hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeActivate(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-activate failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeActivate(this), + (feature, ex) -> logger.error("{}: feature {} before-activate failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } // activate 'policy-management' @@ -1209,31 +1104,21 @@ class PolicyEngineManager implements PolicyEngine { this.unlock(); /* policy-engine dispatch post activate hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterActivate(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-activate failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterActivate(this), + (feature, ex) -> logger.error("{}: feature {} after-activate failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } @Override public synchronized void deactivate() { /* policy-engine dispatch pre deactivate hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeDeactivate(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-deactivate failure because of {}", this, - feature.getClass().getName(), e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeDeactivate(this), + (feature, ex) -> logger.error("{}: feature {} before-deactivate failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } this.lock(); @@ -1248,16 +1133,10 @@ class PolicyEngineManager implements PolicyEngine { } /* policy-engine dispatch post deactivate hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterDeactivate(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-deactivate failure because of {}", this, - feature.getClass().getName(), e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterDeactivate(this), + (feature, ex) -> logger.error("{}: feature {} after-deactivate failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } private boolean controllerConfig(PdpdConfiguration config) { @@ -1269,12 +1148,8 @@ class PolicyEngineManager implements PolicyEngine { } final List<PolicyController> policyControllers = this.updatePolicyControllers(config.getControllers()); - boolean success = false; - if (!(policyControllers == null || policyControllers.isEmpty()) - && (policyControllers.size() == configControllers.size())) { - success = true; - } - return success; + return (policyControllers != null && !policyControllers.isEmpty() + && policyControllers.size() == configControllers.size()); } @Override |