aboutsummaryrefslogtreecommitdiffstats
path: root/policy-management/src/main/java/org/onap
diff options
context:
space:
mode:
Diffstat (limited to 'policy-management/src/main/java/org/onap')
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java127
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java139
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java137
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java4
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java701
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java264
6 files changed, 596 insertions, 776 deletions
diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java
index 733a492d..a4c546f8 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java
@@ -171,8 +171,6 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory {
protected List<TopicCoderFilterConfiguration> codersAndFilters(Properties properties,
List<? extends Topic> topicEntities) {
- String propertyTopicEntityPrefix;
-
List<TopicCoderFilterConfiguration> topics2DecodedClasses2Filters = new ArrayList<>();
if (topicEntities == null || topicEntities.isEmpty()) {
@@ -181,87 +179,102 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory {
for (Topic topic : topicEntities) {
- /* source or sink ? ueb or dmaap? */
- boolean isSource = topic instanceof TopicSource;
- CommInfrastructure commInfra = topic.getTopicCommInfrastructure();
- if (commInfra == CommInfrastructure.UEB) {
- if (isSource) {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + ".";
- } else {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + ".";
- }
- } else if (commInfra == CommInfrastructure.DMAAP) {
- if (isSource) {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + ".";
- } else {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + ".";
- }
- } else if (commInfra == CommInfrastructure.NOOP) {
- if (isSource) {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + ".";
- } else {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
- }
- } else {
- throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
- }
-
// 1. first the topic
String firstTopic = topic.getTopic();
+ String propertyTopicEntityPrefix = getPropertyTopicPrefix(topic) + firstTopic;
+
// 2. check if there is a custom decoder for this topic that the user prefers to use
// instead of the ones provided in the platform
- String customGson = properties.getProperty(propertyTopicEntityPrefix + firstTopic
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX);
-
- CustomGsonCoder customGsonCoder = null;
- if (customGson != null && !customGson.isEmpty()) {
- try {
- customGsonCoder = new CustomGsonCoder(customGson);
- } catch (IllegalArgumentException e) {
- logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson,
- e.getMessage(), e);
- }
- }
+ CustomGsonCoder customGsonCoder = getCustomCoder(properties, propertyTopicEntityPrefix);
// 3. second the list of classes associated with each topic
String eventClasses = properties
- .getProperty(propertyTopicEntityPrefix + firstTopic
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX);
+ .getProperty(propertyTopicEntityPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX);
if (eventClasses == null || eventClasses.isEmpty()) {
logger.warn("There are no event classes for topic {}", firstTopic);
continue;
}
- List<PotentialCoderFilter> classes2Filters = new ArrayList<>();
+ List<PotentialCoderFilter> classes2Filters =
+ getFilterExpressions(properties, propertyTopicEntityPrefix, eventClasses);
- List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*")));
+ TopicCoderFilterConfiguration topic2Classes2Filters =
+ new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder);
+ topics2DecodedClasses2Filters.add(topic2Classes2Filters);
+ }
- for (String theClass : topicClasses) {
+ return topics2DecodedClasses2Filters;
+ }
+ private String getPropertyTopicPrefix(Topic topic) {
+ boolean isSource = topic instanceof TopicSource;
+ CommInfrastructure commInfra = topic.getTopicCommInfrastructure();
+ if (commInfra == CommInfrastructure.UEB) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + ".";
+ }
+ } else if (commInfra == CommInfrastructure.DMAAP) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + ".";
+ }
+ } else if (commInfra == CommInfrastructure.NOOP) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
+ }
+ }
+
+ private CustomGsonCoder getCustomCoder(Properties properties, String propertyPrefix) {
+ String customGson = properties.getProperty(propertyPrefix
+ + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX);
+
+ CustomGsonCoder customGsonCoder = null;
+ if (customGson != null && !customGson.isEmpty()) {
+ try {
+ customGsonCoder = new CustomGsonCoder(customGson);
+ } catch (IllegalArgumentException e) {
+ logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson,
+ e.getMessage(), e);
+ }
+ }
+ return customGsonCoder;
+ }
- // 4. third, for each coder class, get the filter expression
+ private List<PotentialCoderFilter> getFilterExpressions(Properties properties, String propertyPrefix,
+ String eventClasses) {
- String filter = properties
- .getProperty(propertyTopicEntityPrefix + firstTopic
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX
- + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX);
+ List<PotentialCoderFilter> classes2Filters = new ArrayList<>();
- JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter);
- PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter);
- classes2Filters.add(class2Filters);
- }
+ List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*")));
- TopicCoderFilterConfiguration topic2Classes2Filters =
- new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder);
- topics2DecodedClasses2Filters.add(topic2Classes2Filters);
+ for (String theClass : topicClasses) {
+
+ // 4. for each coder class, get the filter expression
+
+ String filter = properties
+ .getProperty(propertyPrefix
+ + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX
+ + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX);
+
+ JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter);
+ PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter);
+ classes2Filters.add(class2Filters);
}
- return topics2DecodedClasses2Filters;
+ return classes2Filters;
}
@Override
diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java
index ca1f2283..77bfcf9f 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java
@@ -41,6 +41,7 @@ import org.kie.api.runtime.rule.QueryResultsRow;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.onap.policy.common.gson.annotation.GsonJsonProperty;
+import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.common.utils.services.OrderedServiceImpl;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
@@ -186,25 +187,11 @@ public class MavenDroolsController implements DroolsController {
logger.info("updating version -> [{}:{}:{}]", newGroupId, newArtifactId, newVersion);
- if (newGroupId == null || newGroupId.isEmpty()) {
- throw new IllegalArgumentException("Missing maven group-id coordinate");
- }
+ validateText(newGroupId, "Missing maven group-id coordinate");
+ validateText(newArtifactId, "Missing maven artifact-id coordinate");
+ validateText(newVersion, "Missing maven version coordinate");
- if (newArtifactId == null || newArtifactId.isEmpty()) {
- throw new IllegalArgumentException("Missing maven artifact-id coordinate");
- }
-
- if (newVersion == null || newVersion.isEmpty()) {
- throw new IllegalArgumentException("Missing maven version coordinate");
- }
-
- if (newGroupId.equalsIgnoreCase(DroolsControllerConstants.NO_GROUP_ID)
- || newArtifactId.equalsIgnoreCase(DroolsControllerConstants.NO_ARTIFACT_ID)
- || newVersion.equalsIgnoreCase(DroolsControllerConstants.NO_VERSION)) {
- throw new IllegalArgumentException("BRAINLESS maven coordinates provided: "
- + newGroupId + ":" + newArtifactId + ":"
- + newVersion);
- }
+ validateHasBrain(newGroupId, newArtifactId, newVersion);
if (newGroupId.equalsIgnoreCase(this.getGroupId())
&& newArtifactId.equalsIgnoreCase(this.getArtifactId())
@@ -214,13 +201,7 @@ public class MavenDroolsController implements DroolsController {
return;
}
- if (!newGroupId.equalsIgnoreCase(this.getGroupId())
- || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
- throw new IllegalArgumentException(
- "Group ID and Artifact ID maven coordinates must be identical for the upgrade: "
- + newGroupId + ":" + newArtifactId + ":"
- + newVersion + " vs. " + this);
- }
+ validateNewVersion(newGroupId, newArtifactId, newVersion);
/* upgrade */
String messages = this.policyContainer.updateToVersion(newVersion);
@@ -239,6 +220,32 @@ public class MavenDroolsController implements DroolsController {
logger.info("UPDATE-TO-VERSION: completed {}", this);
}
+ private void validateText(String text, String errorMessage) {
+ if (text == null || text.isEmpty()) {
+ throw new IllegalArgumentException(errorMessage);
+ }
+ }
+
+ private void validateHasBrain(String newGroupId, String newArtifactId, String newVersion) {
+ if (newGroupId.equalsIgnoreCase(DroolsControllerConstants.NO_GROUP_ID)
+ || newArtifactId.equalsIgnoreCase(DroolsControllerConstants.NO_ARTIFACT_ID)
+ || newVersion.equalsIgnoreCase(DroolsControllerConstants.NO_VERSION)) {
+ throw new IllegalArgumentException("BRAINLESS maven coordinates provided: "
+ + newGroupId + ":" + newArtifactId + ":"
+ + newVersion);
+ }
+ }
+
+ private void validateNewVersion(String newGroupId, String newArtifactId, String newVersion) {
+ if (!newGroupId.equalsIgnoreCase(this.getGroupId())
+ || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
+ throw new IllegalArgumentException(
+ "Group ID and Artifact ID maven coordinates must be identical for the upgrade: "
+ + newGroupId + ":" + newArtifactId + ":"
+ + newVersion + " vs. " + this);
+ }
+ }
+
/**
* initialize decoders for all the topics supported by this controller
* Note this is critical to be done after the Policy Container is
@@ -259,18 +266,7 @@ public class MavenDroolsController implements DroolsController {
for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
String topic = coderConfig.getTopic();
- CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
- if (customGsonCoder != null
- && customGsonCoder.getClassContainer() != null
- && !customGsonCoder.getClassContainer().isEmpty()) {
-
- String customGsonCoderClass = customGsonCoder.getClassContainer();
- if (!isClass(customGsonCoderClass)) {
- throw makeRetrieveEx(customGsonCoderClass);
- } else {
- logClassFetched(customGsonCoderClass);
- }
- }
+ CustomGsonCoder customGsonCoder = getCustomCoder(coderConfig);
List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
if (coderFilters == null || coderFilters.isEmpty()) {
@@ -308,6 +304,22 @@ public class MavenDroolsController implements DroolsController {
}
}
+ private CustomGsonCoder getCustomCoder(TopicCoderFilterConfiguration coderConfig) {
+ CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
+ if (customGsonCoder != null
+ && customGsonCoder.getClassContainer() != null
+ && !customGsonCoder.getClassContainer().isEmpty()) {
+
+ String customGsonCoderClass = customGsonCoder.getClassContainer();
+ if (!isClass(customGsonCoderClass)) {
+ throw makeRetrieveEx(customGsonCoderClass);
+ } else {
+ logClassFetched(customGsonCoderClass);
+ }
+ }
+ return customGsonCoder;
+ }
+
/**
* Logs an error and makes an exception for an item that cannot be retrieved.
* @param itemName the item to retrieve
@@ -520,15 +532,11 @@ public class MavenDroolsController implements DroolsController {
// Broadcast
- for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) {
- try {
- if (feature.beforeInsert(this, event)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-insert failure because of {}",
- this, feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getDroolsProviders().getList(),
+ feature -> feature.beforeInsert(this, event),
+ (feature, ex) -> logger.error("{}: feature {} before-insert failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
boolean successInject = this.policyContainer.insertAll(event);
@@ -536,16 +544,10 @@ public class MavenDroolsController implements DroolsController {
logger.warn(this + "Failed to inject into PolicyContainer {}", this.getSessionNames());
}
- for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) {
- try {
- if (feature.afterInsert(this, event, successInject)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-insert failure because of {}",
- this, feature.getClass().getName(), e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getDroolsProviders().getList(),
+ feature -> feature.afterInsert(this, event, successInject),
+ (feature, ex) -> logger.error("{}: feature {} after-insert failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return true;
@@ -840,18 +842,7 @@ public class MavenDroolsController implements DroolsController {
PolicySession session = getSession(sessionName);
KieSession kieSession = session.getKieSession();
- boolean found = false;
- for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) {
- for (Query q : kiePackage.getQueries()) {
- if (q.getName() != null && q.getName().equals(queryName)) {
- found = true;
- break;
- }
- }
- }
- if (!found) {
- throw new IllegalArgumentException("Invalid Query Name: " + queryName);
- }
+ validateQueryName(kieSession, queryName);
List<Object> factObjects = new ArrayList<>();
@@ -870,6 +861,18 @@ public class MavenDroolsController implements DroolsController {
return factObjects;
}
+ private void validateQueryName(KieSession kieSession, String queryName) {
+ for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) {
+ for (Query q : kiePackage.getQueries()) {
+ if (q.getName() != null && q.getName().equals(queryName)) {
+ return;
+ }
+ }
+ }
+
+ throw new IllegalArgumentException("Invalid Query Name: " + queryName);
+ }
+
@Override
public <T> boolean delete(@NonNull String sessionName, @NonNull T fact) {
String factClassName = fact.getClass().getName();
diff --git a/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java b/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java
index 89a7a420..cb4ce07e 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java
@@ -124,40 +124,44 @@ abstract class GenericEventProtocolCoder {
coders.put(key, coderTools);
- if (reverseCoders.containsKey(reverseKey)) {
- // There is another controller (different group id/artifact id/topic)
- // that shares the class and the topic.
-
- List<ProtocolCoderToolset> toolsets =
- reverseCoders.get(reverseKey);
- boolean present = false;
- for (ProtocolCoderToolset parserSet : toolsets) {
- // just doublecheck
- present = parserSet.getControllerId().equals(key);
- if (present) {
- /* anomaly */
- logger.error(
- "{}: unexpected toolset reverse mapping found for {}:{}: {}",
- this,
- reverseKey,
- key,
- parserSet);
- }
- }
+ addReverseCoder(coderTools, key, reverseKey);
+ }
+ }
+ private void addReverseCoder(GsonProtocolCoderToolset coderTools, String key, String reverseKey) {
+ if (reverseCoders.containsKey(reverseKey)) {
+ // There is another controller (different group id/artifact id/topic)
+ // that shares the class and the topic.
+
+ List<ProtocolCoderToolset> toolsets =
+ reverseCoders.get(reverseKey);
+ boolean present = false;
+ for (ProtocolCoderToolset parserSet : toolsets) {
+ // just doublecheck
+ present = parserSet.getControllerId().equals(key);
if (present) {
- return;
- } else {
- logger.info("{}: adding coder set for {}: {} ", this, reverseKey, coderTools);
- toolsets.add(coderTools);
+ /* anomaly */
+ logger.error(
+ "{}: unexpected toolset reverse mapping found for {}:{}: {}",
+ this,
+ reverseKey,
+ key,
+ parserSet);
}
+ }
+
+ if (present) {
+ return;
} else {
- List<ProtocolCoderToolset> toolsets = new ArrayList<>();
+ logger.info("{}: adding coder set for {}: {} ", this, reverseKey, coderTools);
toolsets.add(coderTools);
-
- logger.info("{}: adding toolset for reverse key {}: {}", this, reverseKey, toolsets);
- reverseCoders.put(reverseKey, toolsets);
}
+ } else {
+ List<ProtocolCoderToolset> toolsets = new ArrayList<>();
+ toolsets.add(coderTools);
+
+ logger.info("{}: adding toolset for reverse key {}: {}", this, reverseKey, toolsets);
+ reverseCoders.put(reverseKey, toolsets);
}
}
@@ -217,30 +221,36 @@ abstract class GenericEventProtocolCoder {
for (CoderFilters codeFilter : coderToolset.getCoders()) {
String className = codeFilter.getCodedClass();
String reverseKey = this.reverseCodersKey(topic, className);
- if (this.reverseCoders.containsKey(reverseKey)) {
- List<ProtocolCoderToolset> toolsets =
- this.reverseCoders.get(reverseKey);
- Iterator<ProtocolCoderToolset> toolsetsIter =
- toolsets.iterator();
- while (toolsetsIter.hasNext()) {
- ProtocolCoderToolset toolset = toolsetsIter.next();
- if (toolset.getControllerId().equals(key)) {
- logger.info(
- "{}: removed coder from toolset for {} from reverse mapping", this, reverseKey);
- toolsetsIter.remove();
- }
- }
-
- if (this.reverseCoders.get(reverseKey).isEmpty()) {
- logger.info("{}: removing reverse mapping for {}: ", this, reverseKey);
- this.reverseCoders.remove(reverseKey);
- }
- }
+ removeReverseCoder(key, reverseKey);
}
}
}
}
+ private void removeReverseCoder(String key, String reverseKey) {
+ if (!this.reverseCoders.containsKey(reverseKey)) {
+ return;
+ }
+
+ List<ProtocolCoderToolset> toolsets =
+ this.reverseCoders.get(reverseKey);
+ Iterator<ProtocolCoderToolset> toolsetsIter =
+ toolsets.iterator();
+ while (toolsetsIter.hasNext()) {
+ ProtocolCoderToolset toolset = toolsetsIter.next();
+ if (toolset.getControllerId().equals(key)) {
+ logger.info(
+ "{}: removed coder from toolset for {} from reverse mapping", this, reverseKey);
+ toolsetsIter.remove();
+ }
+ }
+
+ if (this.reverseCoders.get(reverseKey).isEmpty()) {
+ logger.info("{}: removing reverse mapping for {}: ", this, reverseKey);
+ this.reverseCoders.remove(reverseKey);
+ }
+ }
+
/**
* does it support coding.
*
@@ -446,20 +456,7 @@ abstract class GenericEventProtocolCoder {
}
for (ProtocolCoderToolset encoderSet : toolsets) {
- // figure out the right toolset
- String groupId = encoderSet.getGroupId();
- String artifactId = encoderSet.getArtifactId();
- List<CoderFilters> coderFilters = encoderSet.getCoders();
- for (CoderFilters coder : coderFilters) {
- if (coder.getCodedClass().equals(encodedClass.getClass().getName())) {
- DroolsController droolsController =
- DroolsControllerConstants.getFactory().get(groupId, artifactId, "");
- if (droolsController.ownsCoder(
- encodedClass.getClass(), coder.getModelClassLoaderHash())) {
- droolsControllers.add(droolsController);
- }
- }
- }
+ addToolsetControllers(droolsControllers, encodedClass, encoderSet);
}
if (droolsControllers.isEmpty()) {
@@ -473,6 +470,24 @@ abstract class GenericEventProtocolCoder {
return droolsControllers;
}
+ private void addToolsetControllers(List<DroolsController> droolsControllers, Object encodedClass,
+ ProtocolCoderToolset encoderSet) {
+ // figure out the right toolset
+ String groupId = encoderSet.getGroupId();
+ String artifactId = encoderSet.getArtifactId();
+ List<CoderFilters> coderFilters = encoderSet.getCoders();
+ for (CoderFilters coder : coderFilters) {
+ if (coder.getCodedClass().equals(encodedClass.getClass().getName())) {
+ DroolsController droolsController =
+ DroolsControllerConstants.getFactory().get(groupId, artifactId, "");
+ if (droolsController.ownsCoder(
+ encodedClass.getClass(), coder.getModelClassLoaderHash())) {
+ droolsControllers.add(droolsController);
+ }
+ }
+ }
+ }
+
/**
* get all filters by maven coordinates and topic.
*
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java
index 17247f41..82cd015e 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java
@@ -48,12 +48,12 @@ public interface PolicyController extends Startable, Lockable {
/**
* Get the topic readers of interest for this controller.
*/
- List<? extends TopicSource> getTopicSources();
+ List<TopicSource> getTopicSources();
/**
* Get the topic readers of interest for this controller.
*/
- List<? extends TopicSink> getTopicSinks();
+ List<TopicSink> getTopicSinks();
/**
* Get the Drools Controller.
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
index 36d8ca59..32e3f674 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
@@ -24,6 +24,7 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME;
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT;
+import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.Gson;
@@ -31,6 +32,11 @@ import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import lombok.Getter;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
@@ -43,6 +49,7 @@ import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInst
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.onap.policy.common.gson.annotation.GsonJsonProperty;
+import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.core.PolicyContainer;
@@ -89,11 +96,13 @@ class PolicyEngineManager implements PolicyEngine {
/**
* Is the Policy Engine running.
*/
+ @Getter
private volatile boolean alive = false;
/**
* Is the engine locked.
*/
+ @Getter
private volatile boolean locked = false;
/**
@@ -109,16 +118,19 @@ class PolicyEngineManager implements PolicyEngine {
/**
* Policy Engine Sources.
*/
- private List<? extends TopicSource> sources = new ArrayList<>();
+ @Getter
+ private List<TopicSource> sources = new ArrayList<>();
/**
* Policy Engine Sinks.
*/
- private List<? extends TopicSink> sinks = new ArrayList<>();
+ @Getter
+ private List<TopicSink> sinks = new ArrayList<>();
/**
* Policy Engine HTTP Servers.
*/
+ @Getter
private List<HttpServletServer> httpServers = new ArrayList<>();
/**
@@ -130,15 +142,11 @@ class PolicyEngineManager implements PolicyEngine {
@Override
public synchronized void boot(String[] cliArgs) {
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeBoot(this, cliArgs)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-boot failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeBoot(this, cliArgs),
+ (feature, ex) -> logger.error("{}: feature {} before-boot failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
try {
@@ -147,16 +155,10 @@ class PolicyEngineManager implements PolicyEngine {
logger.error("{}: cannot init policy-container because of {}", this, e.getMessage(), e);
}
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterBoot(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-boot failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterBoot(this),
+ (feature, ex) -> logger.error("{}: feature {} after-boot failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -220,15 +222,11 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch pre configure hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeConfigure(this, properties)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-configure failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeConfigure(this, properties),
+ (feature, ex) -> logger.error("{}: feature {} before-configure failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.properties = properties;
@@ -260,16 +258,10 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch post configure hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterConfigure(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-configure failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterConfigure(this),
+ (feature, ex) -> logger.error("{}: feature {} after-configure failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -337,16 +329,11 @@ class PolicyEngineManager implements PolicyEngine {
}
// feature hook
- for (final PolicyControllerFeatureApi controllerFeature : getControllerProviders()) {
- try {
- if (controllerFeature.afterCreate(controller)) {
- return controller;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-controller-create failure because of {}", this,
- controllerFeature.getClass().getName(), e.getMessage(), e);
- }
- }
+ PolicyController controller2 = controller;
+ FeatureApiUtils.apply(getControllerProviders(),
+ feature -> feature.afterCreate(controller2),
+ (feature, ex) -> logger.error("{}: feature {} after-controller-create failure because of {}",
+ this, feature.getClass().getName(), ex.getMessage(), ex));
return controller;
}
@@ -393,7 +380,6 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException("No controller configuration has been provided");
}
- PolicyController policyController = null;
try {
final String operation = configController.getOperation();
if (operation == null || operation.isEmpty()) {
@@ -401,75 +387,14 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException("operation must be provided");
}
- try {
- policyController = getControllerFactory().get(controllerName);
- } catch (final IllegalArgumentException e) {
- // not found
- logger.warn("Policy Controller " + controllerName + " not found", e);
- }
-
+ PolicyController policyController = getController(controllerName);
if (policyController == null) {
-
- if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK)
- || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
- throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
- }
-
- /* Recovery case */
-
- logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName);
-
- final Properties controllerProperties =
- getPersistenceManager().getControllerProperties(controllerName);
-
- /*
- * returned properties cannot be null (per implementation) assert (properties !=
- * null)
- */
-
- if (controllerProperties == null) {
- throw new IllegalArgumentException(controllerName + " is invalid");
- }
-
- logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless",
- controllerName);
-
- /*
- * try to bring up bad controller in brainless mode, after having it
- * working, apply the new create/update operation.
- */
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID,
- DroolsControllerConstants.NO_GROUP_ID);
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
- DroolsControllerConstants.NO_ARTIFACT_ID);
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION,
- DroolsControllerConstants.NO_VERSION);
-
- policyController = getPolicyEngine().createPolicyController(controllerName, controllerProperties);
+ policyController = findController(controllerName, operation);
/* fall through to do brain update operation */
}
- switch (operation) {
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
- getControllerFactory().patch(policyController, configController.getDrools());
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
- policyController.unlock();
- getControllerFactory().patch(policyController, configController.getDrools());
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
- policyController.lock();
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
- policyController.unlock();
- break;
- default:
- final String msg = "Controller Operation Configuration is not supported: " + operation + " for "
- + controllerName;
- logger.warn(msg);
- throw new IllegalArgumentException(msg);
- }
+ updateController(controllerName, policyController, operation, configController);
return policyController;
} catch (final Exception e) {
@@ -481,84 +406,135 @@ class PolicyEngineManager implements PolicyEngine {
}
}
+ private PolicyController getController(final String controllerName) {
+ PolicyController policyController = null;
+ try {
+ policyController = getControllerFactory().get(controllerName);
+ } catch (final IllegalArgumentException e) {
+ // not found
+ logger.warn("Policy Controller " + controllerName + " not found", e);
+ }
+ return policyController;
+ }
+
+ private PolicyController findController(final String controllerName, final String operation) {
+ if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK)
+ || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
+ throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
+ }
+
+ /* Recovery case */
+
+ logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName);
+
+ final Properties controllerProperties =
+ getPersistenceManager().getControllerProperties(controllerName);
+
+ /*
+ * returned properties cannot be null (per implementation) assert (properties !=
+ * null)
+ */
+
+ if (controllerProperties == null) {
+ throw new IllegalArgumentException(controllerName + " is invalid");
+ }
+
+ logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless",
+ controllerName);
+
+ /*
+ * try to bring up bad controller in brainless mode, after having it
+ * working, apply the new create/update operation.
+ */
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID,
+ DroolsControllerConstants.NO_GROUP_ID);
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
+ DroolsControllerConstants.NO_ARTIFACT_ID);
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION,
+ DroolsControllerConstants.NO_VERSION);
+
+ return getPolicyEngine().createPolicyController(controllerName, controllerProperties);
+ }
+
+ private void updateController(final String controllerName, PolicyController policyController,
+ final String operation, ControllerConfiguration configController) {
+ switch (operation) {
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
+ getControllerFactory().patch(policyController, configController.getDrools());
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
+ policyController.unlock();
+ getControllerFactory().patch(policyController, configController.getDrools());
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
+ policyController.lock();
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
+ policyController.unlock();
+ break;
+ default:
+ final String msg = "Controller Operation Configuration is not supported: " + operation + " for "
+ + controllerName;
+ logger.warn(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
@Override
public synchronized boolean start() {
/* policy-engine dispatch pre start hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeStart(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeStart(this),
+ (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
- boolean success = true;
if (this.locked) {
throw new IllegalStateException(ENGINE_LOCKED_MSG);
}
this.alive = true;
+ AtomicReference<Boolean> success = new AtomicReference<>(true);
+
/* Start Policy Engine exclusively-owned (unmanaged) http servers */
- for (final HttpServletServer httpServer : this.httpServers) {
- try {
- if (!httpServer.waitedStart(10 * 1000L)) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e);
- }
- }
+ attempt(success, this.httpServers,
+ httpServer -> httpServer.waitedStart(10 * 1000L),
+ (item, ex) -> logger.error("{}: cannot start http-server {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Engine exclusively-owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- if (!source.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ attempt(success, this.sources,
+ TopicSource::start,
+ (item, ex) -> logger.error("{}: cannot start topic-source {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- if (!sink.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ attempt(success, this.sinks,
+ TopicSink::start,
+ (item, ex) -> logger.error("{}: cannot start topic-sink {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Controllers */
- final List<PolicyController> controllers = getControllerFactory().inventory();
- for (final PolicyController controller : controllers) {
- try {
- if (!controller.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start policy-controller {} because of {}", this, controller, e.getMessage(),
- e);
- success = false;
- }
- }
+ attempt(success, getControllerFactory().inventory(),
+ PolicyController::start,
+ (item, ex) -> {
+ logger.error("{}: cannot start policy-controller {} because of {}", this, item,
+ ex.getMessage(), ex);
+ success.set(false);
+ });
/* Start managed Topic Endpoints */
try {
if (!getTopicEndpointManager().start()) {
- success = false;
+ success.set(false);
}
} catch (final IllegalStateException e) {
logger.warn("{}: Topic Endpoint Manager is in an invalid state because of {}", this, e.getMessage(), e);
@@ -570,109 +546,80 @@ class PolicyEngineManager implements PolicyEngine {
startPdpJmxListener();
/* policy-engine dispatch after start hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterStart(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterStart(this),
+ (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
- return success;
+ return success.get();
+ }
+
+ @FunctionalInterface
+ private static interface PredicateWithEx<T> {
+ public boolean test(T value) throws InterruptedException;
}
@Override
public synchronized boolean stop() {
/* policy-engine dispatch pre stop hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeStop(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeStop(this),
+ (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
/* stop regardless of the lock state */
- boolean success = true;
if (!this.alive) {
return true;
}
this.alive = false;
- final List<PolicyController> controllers = getControllerFactory().inventory();
- for (final PolicyController controller : controllers) {
- try {
- if (!controller.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot stop policy-controller {} because of {}", this, controller, e.getMessage(), e);
- success = false;
- }
- }
+ AtomicReference<Boolean> success = new AtomicReference<>(true);
+
+ attempt(success, getControllerFactory().inventory(),
+ PolicyController::stop,
+ (item, ex) -> {
+ logger.error("{}: cannot stop policy-controller {} because of {}", this, item,
+ ex.getMessage(), ex);
+ success.set(false);
+ });
/* Stop Policy Engine owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- if (!source.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ attempt(success, this.sources,
+ TopicSource::stop,
+ (item, ex) -> logger.error("{}: cannot stop topic-source {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Stop Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- if (!sink.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ attempt(success, this.sinks,
+ TopicSink::stop,
+ (item, ex) -> logger.error("{}: cannot stop topic-sink {} because of {}", this, item,
+ ex.getMessage(), ex));
/* stop all managed topics sources and sinks */
if (!getTopicEndpointManager().stop()) {
- success = false;
+ success.set(false);
}
/* stop all unmanaged http servers */
- for (final HttpServletServer httpServer : this.httpServers) {
- try {
- if (!httpServer.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e);
- }
- }
+ attempt(success, this.httpServers,
+ HttpServletServer::stop,
+ (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item,
+ ex.getMessage(), ex));
// stop JMX?
/* policy-engine dispatch pre stop hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterStop(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterStop(this),
+ (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
- return success;
+ return success.get();
}
@Override
@@ -687,36 +634,26 @@ class PolicyEngineManager implements PolicyEngine {
exitThread.start();
/* policy-engine dispatch pre shutdown hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeShutdown(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.alive = false;
/* Shutdown Policy Engine owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- source.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ applyAll(this.sources,
+ TopicSource::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown topic-source {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Shutdown Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- sink.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ applyAll(this.sinks,
+ TopicSink::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown topic-sink {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Shutdown managed resources */
getControllerFactory().shutdown();
@@ -728,19 +665,45 @@ class PolicyEngineManager implements PolicyEngine {
stopPdpJmxListener();
/* policy-engine dispatch post shutdown hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
+
+ exitThread.interrupt();
+ logger.info("{}: normal termination", this);
+ }
+
+ private <T> void attempt(AtomicReference<Boolean> success, List<T> items, PredicateWithEx<T> pred,
+ BiConsumer<T,Exception> handleEx) {
+
+ for (T item : items) {
try {
- if (feature.afterShutdown(this)) {
- return;
+ if (!pred.test(item)) {
+ success.set(false);
}
- } catch (final Exception e) {
- logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
+
+ } catch (InterruptedException ex) {
+ handleEx.accept(item, ex);
+ Thread.currentThread().interrupt();
+
+ } catch (RuntimeException ex) {
+ handleEx.accept(item, ex);
}
}
+ }
- exitThread.interrupt();
- logger.info("{}: normal termination", this);
+ private <T> void applyAll(List<T> items, Consumer<T> function,
+ BiConsumer<T,Exception> handleEx) {
+
+ for (T item : items) {
+ try {
+ function.accept(item);
+
+ } catch (RuntimeException ex) {
+ handleEx.accept(item, ex);
+ }
+ }
}
/**
@@ -764,14 +727,10 @@ class PolicyEngineManager implements PolicyEngine {
/*
* shut down the Policy Engine owned http servers as the very last thing
*/
- for (final HttpServletServer httpServer : PolicyEngineManager.this.getHttpServers()) {
- try {
- httpServer.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown http-server {} because of {}", PolicyEngineManager.this,
- httpServer, e.getMessage(), e);
- }
- }
+ applyAll(PolicyEngineManager.this.getHttpServers(),
+ HttpServletServer::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown http-server {} because of {}", this, item,
+ ex.getMessage(), ex));
logger.info("{}: exit", PolicyEngineManager.this);
doExit(0);
@@ -790,23 +749,14 @@ class PolicyEngineManager implements PolicyEngine {
}
@Override
- public boolean isAlive() {
- return this.alive;
- }
-
- @Override
public synchronized boolean lock() {
/* policy-engine dispatch pre lock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeLock(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeLock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (this.locked) {
@@ -829,16 +779,10 @@ class PolicyEngineManager implements PolicyEngine {
success = getTopicEndpointManager().lock() && success;
/* policy-engine dispatch post lock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterLock(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterLock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -847,15 +791,11 @@ class PolicyEngineManager implements PolicyEngine {
public synchronized boolean unlock() {
/* policy-engine dispatch pre unlock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeUnlock(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (!this.locked) {
@@ -879,26 +819,15 @@ class PolicyEngineManager implements PolicyEngine {
success = getTopicEndpointManager().unlock() && success;
/* policy-engine dispatch after unlock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterUnlock(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@Override
- public boolean isLocked() {
- return this.locked;
- }
-
- @Override
public void removePolicyController(String name) {
getControllerFactory().destroy(name);
}
@@ -933,24 +862,6 @@ class PolicyEngineManager implements PolicyEngine {
return this.properties;
}
-
- @SuppressWarnings("unchecked")
- @Override
- public List<TopicSource> getSources() {
- return (List<TopicSource>) this.sources;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public List<TopicSink> getSinks() {
- return (List<TopicSink>) this.sinks;
- }
-
- @Override
- public List<HttpServletServer> getHttpServers() {
- return this.httpServers;
- }
-
@Override
public List<String> getFeatures() {
final List<String> features = new ArrayList<>();
@@ -985,15 +896,12 @@ class PolicyEngineManager implements PolicyEngine {
@Override
public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
/* policy-engine pre topic event hook */
- for (final PolicyEngineFeatureApi feature : getFeatureProviders()) {
- try {
- if (feature.beforeOnTopicEvent(this, commType, topic, event)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this,
- feature.getClass().getName(), event, e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getFeatureProviders(),
+ feature -> feature.beforeOnTopicEvent(this, commType, topic, event),
+ (feature, ex) -> logger.error(
+ "{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this,
+ feature.getClass().getName(), event, ex.getMessage(), ex))) {
+ return;
}
/* configuration request */
@@ -1006,16 +914,11 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine after topic event hook */
- for (final PolicyEngineFeatureApi feature : getFeatureProviders()) {
- try {
- if (feature.afterOnTopicEvent(this, configuration, commType, topic, event)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this,
- feature.getClass().getName(), event, e.getMessage(), e);
- }
- }
+ PdpdConfiguration configuration2 = configuration;
+ FeatureApiUtils.apply(getFeatureProviders(),
+ feature -> feature.afterOnTopicEvent(this, configuration2, commType, topic, event),
+ (feature, ex) -> logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this,
+ feature.getClass().getName(), event, ex.getMessage(), ex));
}
@Override
@@ -1041,7 +944,7 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalStateException(ENGINE_LOCKED_MSG);
}
- final List<? extends TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic);
+ final List<TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic);
if (topicSinks == null || topicSinks.size() != 1) {
throw new IllegalStateException("Cannot ensure correct delivery on topic " + topic + ": " + topicSinks);
}
@@ -1056,11 +959,11 @@ class PolicyEngineManager implements PolicyEngine {
* Note this entry point is usually from the DRL (one of the reasons busType is String.
*/
- if (busType == null || busType.isEmpty()) {
+ if (StringUtils.isBlank(busType)) {
throw new IllegalArgumentException("Invalid Communication Infrastructure");
}
- if (topic == null || topic.isEmpty()) {
+ if (StringUtils.isBlank(topic)) {
throw new IllegalArgumentException(INVALID_TOPIC_MSG);
}
@@ -1068,12 +971,8 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException(INVALID_EVENT_MSG);
}
- boolean valid = false;
- for (final Topic.CommInfrastructure comm : Topic.CommInfrastructure.values()) {
- if (comm.name().equals(busType)) {
- valid = true;
- }
- }
+ boolean valid = Stream.of(Topic.CommInfrastructure.values()).map(Enum::name)
+ .anyMatch(name -> name.equals(busType));
if (!valid) {
throw new IllegalArgumentException("Invalid Communication Infrastructure: " + busType);
@@ -1181,15 +1080,11 @@ class PolicyEngineManager implements PolicyEngine {
public synchronized void activate() {
/* policy-engine dispatch pre activate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeActivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-activate failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeActivate(this),
+ (feature, ex) -> logger.error("{}: feature {} before-activate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
// activate 'policy-management'
@@ -1209,31 +1104,21 @@ class PolicyEngineManager implements PolicyEngine {
this.unlock();
/* policy-engine dispatch post activate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterActivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-activate failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterActivate(this),
+ (feature, ex) -> logger.error("{}: feature {} after-activate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
public synchronized void deactivate() {
/* policy-engine dispatch pre deactivate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeDeactivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-deactivate failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeDeactivate(this),
+ (feature, ex) -> logger.error("{}: feature {} before-deactivate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.lock();
@@ -1248,16 +1133,10 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch post deactivate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterDeactivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-deactivate failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterDeactivate(this),
+ (feature, ex) -> logger.error("{}: feature {} after-deactivate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
private boolean controllerConfig(PdpdConfiguration config) {
@@ -1269,12 +1148,8 @@ class PolicyEngineManager implements PolicyEngine {
}
final List<PolicyController> policyControllers = this.updatePolicyControllers(config.getControllers());
- boolean success = false;
- if (!(policyControllers == null || policyControllers.isEmpty())
- && (policyControllers.size() == configControllers.size())) {
- success = true;
- }
- return success;
+ return (policyControllers != null && !policyControllers.isEmpty()
+ && policyControllers.size() == configControllers.size());
}
@Override
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
index 5d915104..aa57abaf 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
@@ -34,6 +34,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
+import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.controller.DroolsControllerFactory;
@@ -54,6 +55,9 @@ import org.slf4j.LoggerFactory;
*/
public class AggregatedPolicyController implements PolicyController, TopicListener {
+ private static final String BEFORE_OFFER_FAILURE = "{}: feature {} before-offer failure because of {}";
+ private static final String AFTER_OFFER_FAILURE = "{}: feature {} after-offer failure because of {}";
+
/**
* Logger.
*/
@@ -67,12 +71,12 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
/**
* Abstracted Event Sources List regardless communication technology.
*/
- private final List<? extends TopicSource> sources;
+ private final List<TopicSource> sources;
/**
* Abstracted Event Sinks List regardless communication technology.
*/
- private final List<? extends TopicSink> sinks;
+ private final List<TopicSink> sinks;
/**
* Mapping topics to sinks.
@@ -273,15 +277,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public boolean start() {
logger.info("{}: start", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeStart(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeStart(this),
+ (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (this.isLocked()) {
@@ -312,16 +312,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
}
}
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterStart(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterStart(this),
+ (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -333,15 +327,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public boolean stop() {
logger.info("{}: stop", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeStop(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeStop(this),
+ (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
/* stop regardless locked state */
@@ -362,16 +352,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.stop();
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterStop(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterStop(this),
+ (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -383,31 +367,21 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public void shutdown() {
logger.info("{}: shutdown", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeShutdown(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.stop();
getDroolsFactory().shutdown(this.droolsController);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterShutdown(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
/**
@@ -417,31 +391,21 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public void halt() {
logger.info("{}: halt", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeHalt(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-halt failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeHalt(this),
+ (feature, ex) -> logger.error("{}: feature {} before-halt failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.stop();
getDroolsFactory().destroy(this.droolsController);
getPersistenceManager().deleteController(this.name);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterHalt(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-halt failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterHalt(this),
+ (feature, ex) -> logger.error("{}: feature {} after-halt failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
/**
@@ -455,29 +419,19 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
return;
}
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeOffer(this, commType, topic, event)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeOffer(this, commType, topic, event),
+ (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
boolean success = this.droolsController.offer(topic, event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterOffer(this, commType, topic, event, success)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterOffer(this, commType, topic, event, success),
+ (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -488,29 +442,19 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
return true;
}
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeOffer(this, event)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeOffer(this, event),
+ (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
boolean success = this.droolsController.offer(event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterOffer(this, event, success)) {
- return success;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterOffer(this, event, success),
+ (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -527,15 +471,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
logger.debug("{}: deliver event to {}:{}: {}", this, commType, topic, event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeDeliver(this, commType, topic, event)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-deliver failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeDeliver(this, commType, topic, event),
+ (feature, ex) -> logger.error("{}: feature {} before-deliver failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (topic == null || topic.isEmpty()) {
@@ -562,16 +502,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.deliver(this.topic2Sinks.get(topic), event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterDeliver(this, commType, topic, event, success)) {
- return success;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-deliver failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterDeliver(this, commType, topic, event, success),
+ (feature, ex) -> logger.error("{}: feature {} after-deliver failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -591,15 +525,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public boolean lock() {
logger.info("{}: lock", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeLock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeLock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
synchronized (this) {
@@ -615,16 +545,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.lock();
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterLock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterLock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -637,15 +561,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
logger.info("{}: unlock", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeUnlock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
synchronized (this) {
@@ -658,16 +578,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.unlock();
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterUnlock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -684,7 +598,7 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
* {@inheritDoc}.
*/
@Override
- public List<? extends TopicSource> getTopicSources() {
+ public List<TopicSource> getTopicSources() {
return this.sources;
}
@@ -692,7 +606,7 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
* {@inheritDoc}.
*/
@Override
- public List<? extends TopicSink> getTopicSinks() {
+ public List<TopicSink> getTopicSinks() {
return this.sinks;
}