summaryrefslogtreecommitdiffstats
path: root/policy-management/src/main/java/org
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2019-08-14 17:31:50 -0400
committerJim Hahn <jrh3@att.com>2019-08-15 11:23:31 -0400
commit59e9b9a8b56d563814ef21a23716959f772f9194 (patch)
treef152aea1578a82737501f56916ca07d8e7889d18 /policy-management/src/main/java/org
parenta156cf3cbad6512510ae9a02a13c0408f901c734 (diff)
Fix more sonar issues in drools-pdp
Addressed issues of cyclomatic complexity and deep nesting by refactoring code into separate methods. In some cases, had to refactor the code into nested classes to avoid passing too many parameters to the newly extracted methods. Addressed issue "too many conditionals" by breaking conditionals apart. Addressed issue "Remove usage of generic wildcard type" by eliminating "? extends" from return values. Addressed issue "Remove this use of 'Thread.sleep()'" in junit tests by introducing latches or using Awaitility. Note: this won't build until ApiUtils has been merged. Change-Id: I0d5596b4cb918a36bc22f426f426bd238195b458 Issue-ID: POLICY-1968 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'policy-management/src/main/java/org')
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java127
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java139
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java137
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java4
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java701
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java264
6 files changed, 596 insertions, 776 deletions
diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java
index 733a492d..a4c546f8 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java
@@ -171,8 +171,6 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory {
protected List<TopicCoderFilterConfiguration> codersAndFilters(Properties properties,
List<? extends Topic> topicEntities) {
- String propertyTopicEntityPrefix;
-
List<TopicCoderFilterConfiguration> topics2DecodedClasses2Filters = new ArrayList<>();
if (topicEntities == null || topicEntities.isEmpty()) {
@@ -181,87 +179,102 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory {
for (Topic topic : topicEntities) {
- /* source or sink ? ueb or dmaap? */
- boolean isSource = topic instanceof TopicSource;
- CommInfrastructure commInfra = topic.getTopicCommInfrastructure();
- if (commInfra == CommInfrastructure.UEB) {
- if (isSource) {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + ".";
- } else {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + ".";
- }
- } else if (commInfra == CommInfrastructure.DMAAP) {
- if (isSource) {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + ".";
- } else {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + ".";
- }
- } else if (commInfra == CommInfrastructure.NOOP) {
- if (isSource) {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + ".";
- } else {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
- }
- } else {
- throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
- }
-
// 1. first the topic
String firstTopic = topic.getTopic();
+ String propertyTopicEntityPrefix = getPropertyTopicPrefix(topic) + firstTopic;
+
// 2. check if there is a custom decoder for this topic that the user prefers to use
// instead of the ones provided in the platform
- String customGson = properties.getProperty(propertyTopicEntityPrefix + firstTopic
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX);
-
- CustomGsonCoder customGsonCoder = null;
- if (customGson != null && !customGson.isEmpty()) {
- try {
- customGsonCoder = new CustomGsonCoder(customGson);
- } catch (IllegalArgumentException e) {
- logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson,
- e.getMessage(), e);
- }
- }
+ CustomGsonCoder customGsonCoder = getCustomCoder(properties, propertyTopicEntityPrefix);
// 3. second the list of classes associated with each topic
String eventClasses = properties
- .getProperty(propertyTopicEntityPrefix + firstTopic
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX);
+ .getProperty(propertyTopicEntityPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX);
if (eventClasses == null || eventClasses.isEmpty()) {
logger.warn("There are no event classes for topic {}", firstTopic);
continue;
}
- List<PotentialCoderFilter> classes2Filters = new ArrayList<>();
+ List<PotentialCoderFilter> classes2Filters =
+ getFilterExpressions(properties, propertyTopicEntityPrefix, eventClasses);
- List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*")));
+ TopicCoderFilterConfiguration topic2Classes2Filters =
+ new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder);
+ topics2DecodedClasses2Filters.add(topic2Classes2Filters);
+ }
- for (String theClass : topicClasses) {
+ return topics2DecodedClasses2Filters;
+ }
+ private String getPropertyTopicPrefix(Topic topic) {
+ boolean isSource = topic instanceof TopicSource;
+ CommInfrastructure commInfra = topic.getTopicCommInfrastructure();
+ if (commInfra == CommInfrastructure.UEB) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + ".";
+ }
+ } else if (commInfra == CommInfrastructure.DMAAP) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + ".";
+ }
+ } else if (commInfra == CommInfrastructure.NOOP) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
+ }
+ }
+
+ private CustomGsonCoder getCustomCoder(Properties properties, String propertyPrefix) {
+ String customGson = properties.getProperty(propertyPrefix
+ + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX);
+
+ CustomGsonCoder customGsonCoder = null;
+ if (customGson != null && !customGson.isEmpty()) {
+ try {
+ customGsonCoder = new CustomGsonCoder(customGson);
+ } catch (IllegalArgumentException e) {
+ logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson,
+ e.getMessage(), e);
+ }
+ }
+ return customGsonCoder;
+ }
- // 4. third, for each coder class, get the filter expression
+ private List<PotentialCoderFilter> getFilterExpressions(Properties properties, String propertyPrefix,
+ String eventClasses) {
- String filter = properties
- .getProperty(propertyTopicEntityPrefix + firstTopic
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX
- + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX);
+ List<PotentialCoderFilter> classes2Filters = new ArrayList<>();
- JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter);
- PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter);
- classes2Filters.add(class2Filters);
- }
+ List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*")));
- TopicCoderFilterConfiguration topic2Classes2Filters =
- new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder);
- topics2DecodedClasses2Filters.add(topic2Classes2Filters);
+ for (String theClass : topicClasses) {
+
+ // 4. for each coder class, get the filter expression
+
+ String filter = properties
+ .getProperty(propertyPrefix
+ + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX
+ + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX);
+
+ JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter);
+ PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter);
+ classes2Filters.add(class2Filters);
}
- return topics2DecodedClasses2Filters;
+ return classes2Filters;
}
@Override
diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java
index ca1f2283..77bfcf9f 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java
@@ -41,6 +41,7 @@ import org.kie.api.runtime.rule.QueryResultsRow;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.onap.policy.common.gson.annotation.GsonJsonProperty;
+import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.common.utils.services.OrderedServiceImpl;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
@@ -186,25 +187,11 @@ public class MavenDroolsController implements DroolsController {
logger.info("updating version -> [{}:{}:{}]", newGroupId, newArtifactId, newVersion);
- if (newGroupId == null || newGroupId.isEmpty()) {
- throw new IllegalArgumentException("Missing maven group-id coordinate");
- }
+ validateText(newGroupId, "Missing maven group-id coordinate");
+ validateText(newArtifactId, "Missing maven artifact-id coordinate");
+ validateText(newVersion, "Missing maven version coordinate");
- if (newArtifactId == null || newArtifactId.isEmpty()) {
- throw new IllegalArgumentException("Missing maven artifact-id coordinate");
- }
-
- if (newVersion == null || newVersion.isEmpty()) {
- throw new IllegalArgumentException("Missing maven version coordinate");
- }
-
- if (newGroupId.equalsIgnoreCase(DroolsControllerConstants.NO_GROUP_ID)
- || newArtifactId.equalsIgnoreCase(DroolsControllerConstants.NO_ARTIFACT_ID)
- || newVersion.equalsIgnoreCase(DroolsControllerConstants.NO_VERSION)) {
- throw new IllegalArgumentException("BRAINLESS maven coordinates provided: "
- + newGroupId + ":" + newArtifactId + ":"
- + newVersion);
- }
+ validateHasBrain(newGroupId, newArtifactId, newVersion);
if (newGroupId.equalsIgnoreCase(this.getGroupId())
&& newArtifactId.equalsIgnoreCase(this.getArtifactId())
@@ -214,13 +201,7 @@ public class MavenDroolsController implements DroolsController {
return;
}
- if (!newGroupId.equalsIgnoreCase(this.getGroupId())
- || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
- throw new IllegalArgumentException(
- "Group ID and Artifact ID maven coordinates must be identical for the upgrade: "
- + newGroupId + ":" + newArtifactId + ":"
- + newVersion + " vs. " + this);
- }
+ validateNewVersion(newGroupId, newArtifactId, newVersion);
/* upgrade */
String messages = this.policyContainer.updateToVersion(newVersion);
@@ -239,6 +220,32 @@ public class MavenDroolsController implements DroolsController {
logger.info("UPDATE-TO-VERSION: completed {}", this);
}
+ private void validateText(String text, String errorMessage) {
+ if (text == null || text.isEmpty()) {
+ throw new IllegalArgumentException(errorMessage);
+ }
+ }
+
+ private void validateHasBrain(String newGroupId, String newArtifactId, String newVersion) {
+ if (newGroupId.equalsIgnoreCase(DroolsControllerConstants.NO_GROUP_ID)
+ || newArtifactId.equalsIgnoreCase(DroolsControllerConstants.NO_ARTIFACT_ID)
+ || newVersion.equalsIgnoreCase(DroolsControllerConstants.NO_VERSION)) {
+ throw new IllegalArgumentException("BRAINLESS maven coordinates provided: "
+ + newGroupId + ":" + newArtifactId + ":"
+ + newVersion);
+ }
+ }
+
+ private void validateNewVersion(String newGroupId, String newArtifactId, String newVersion) {
+ if (!newGroupId.equalsIgnoreCase(this.getGroupId())
+ || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
+ throw new IllegalArgumentException(
+ "Group ID and Artifact ID maven coordinates must be identical for the upgrade: "
+ + newGroupId + ":" + newArtifactId + ":"
+ + newVersion + " vs. " + this);
+ }
+ }
+
/**
* initialize decoders for all the topics supported by this controller
* Note this is critical to be done after the Policy Container is
@@ -259,18 +266,7 @@ public class MavenDroolsController implements DroolsController {
for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
String topic = coderConfig.getTopic();
- CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
- if (customGsonCoder != null
- && customGsonCoder.getClassContainer() != null
- && !customGsonCoder.getClassContainer().isEmpty()) {
-
- String customGsonCoderClass = customGsonCoder.getClassContainer();
- if (!isClass(customGsonCoderClass)) {
- throw makeRetrieveEx(customGsonCoderClass);
- } else {
- logClassFetched(customGsonCoderClass);
- }
- }
+ CustomGsonCoder customGsonCoder = getCustomCoder(coderConfig);
List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
if (coderFilters == null || coderFilters.isEmpty()) {
@@ -308,6 +304,22 @@ public class MavenDroolsController implements DroolsController {
}
}
+ private CustomGsonCoder getCustomCoder(TopicCoderFilterConfiguration coderConfig) {
+ CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
+ if (customGsonCoder != null
+ && customGsonCoder.getClassContainer() != null
+ && !customGsonCoder.getClassContainer().isEmpty()) {
+
+ String customGsonCoderClass = customGsonCoder.getClassContainer();
+ if (!isClass(customGsonCoderClass)) {
+ throw makeRetrieveEx(customGsonCoderClass);
+ } else {
+ logClassFetched(customGsonCoderClass);
+ }
+ }
+ return customGsonCoder;
+ }
+
/**
* Logs an error and makes an exception for an item that cannot be retrieved.
* @param itemName the item to retrieve
@@ -520,15 +532,11 @@ public class MavenDroolsController implements DroolsController {
// Broadcast
- for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) {
- try {
- if (feature.beforeInsert(this, event)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-insert failure because of {}",
- this, feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getDroolsProviders().getList(),
+ feature -> feature.beforeInsert(this, event),
+ (feature, ex) -> logger.error("{}: feature {} before-insert failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
boolean successInject = this.policyContainer.insertAll(event);
@@ -536,16 +544,10 @@ public class MavenDroolsController implements DroolsController {
logger.warn(this + "Failed to inject into PolicyContainer {}", this.getSessionNames());
}
- for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) {
- try {
- if (feature.afterInsert(this, event, successInject)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-insert failure because of {}",
- this, feature.getClass().getName(), e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getDroolsProviders().getList(),
+ feature -> feature.afterInsert(this, event, successInject),
+ (feature, ex) -> logger.error("{}: feature {} after-insert failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return true;
@@ -840,18 +842,7 @@ public class MavenDroolsController implements DroolsController {
PolicySession session = getSession(sessionName);
KieSession kieSession = session.getKieSession();
- boolean found = false;
- for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) {
- for (Query q : kiePackage.getQueries()) {
- if (q.getName() != null && q.getName().equals(queryName)) {
- found = true;
- break;
- }
- }
- }
- if (!found) {
- throw new IllegalArgumentException("Invalid Query Name: " + queryName);
- }
+ validateQueryName(kieSession, queryName);
List<Object> factObjects = new ArrayList<>();
@@ -870,6 +861,18 @@ public class MavenDroolsController implements DroolsController {
return factObjects;
}
+ private void validateQueryName(KieSession kieSession, String queryName) {
+ for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) {
+ for (Query q : kiePackage.getQueries()) {
+ if (q.getName() != null && q.getName().equals(queryName)) {
+ return;
+ }
+ }
+ }
+
+ throw new IllegalArgumentException("Invalid Query Name: " + queryName);
+ }
+
@Override
public <T> boolean delete(@NonNull String sessionName, @NonNull T fact) {
String factClassName = fact.getClass().getName();
diff --git a/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java b/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java
index 89a7a420..cb4ce07e 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java
@@ -124,40 +124,44 @@ abstract class GenericEventProtocolCoder {
coders.put(key, coderTools);
- if (reverseCoders.containsKey(reverseKey)) {
- // There is another controller (different group id/artifact id/topic)
- // that shares the class and the topic.
-
- List<ProtocolCoderToolset> toolsets =
- reverseCoders.get(reverseKey);
- boolean present = false;
- for (ProtocolCoderToolset parserSet : toolsets) {
- // just doublecheck
- present = parserSet.getControllerId().equals(key);
- if (present) {
- /* anomaly */
- logger.error(
- "{}: unexpected toolset reverse mapping found for {}:{}: {}",
- this,
- reverseKey,
- key,
- parserSet);
- }
- }
+ addReverseCoder(coderTools, key, reverseKey);
+ }
+ }
+ private void addReverseCoder(GsonProtocolCoderToolset coderTools, String key, String reverseKey) {
+ if (reverseCoders.containsKey(reverseKey)) {
+ // There is another controller (different group id/artifact id/topic)
+ // that shares the class and the topic.
+
+ List<ProtocolCoderToolset> toolsets =
+ reverseCoders.get(reverseKey);
+ boolean present = false;
+ for (ProtocolCoderToolset parserSet : toolsets) {
+ // just doublecheck
+ present = parserSet.getControllerId().equals(key);
if (present) {
- return;
- } else {
- logger.info("{}: adding coder set for {}: {} ", this, reverseKey, coderTools);
- toolsets.add(coderTools);
+ /* anomaly */
+ logger.error(
+ "{}: unexpected toolset reverse mapping found for {}:{}: {}",
+ this,
+ reverseKey,
+ key,
+ parserSet);
}
+ }
+
+ if (present) {
+ return;
} else {
- List<ProtocolCoderToolset> toolsets = new ArrayList<>();
+ logger.info("{}: adding coder set for {}: {} ", this, reverseKey, coderTools);
toolsets.add(coderTools);
-
- logger.info("{}: adding toolset for reverse key {}: {}", this, reverseKey, toolsets);
- reverseCoders.put(reverseKey, toolsets);
}
+ } else {
+ List<ProtocolCoderToolset> toolsets = new ArrayList<>();
+ toolsets.add(coderTools);
+
+ logger.info("{}: adding toolset for reverse key {}: {}", this, reverseKey, toolsets);
+ reverseCoders.put(reverseKey, toolsets);
}
}
@@ -217,30 +221,36 @@ abstract class GenericEventProtocolCoder {
for (CoderFilters codeFilter : coderToolset.getCoders()) {
String className = codeFilter.getCodedClass();
String reverseKey = this.reverseCodersKey(topic, className);
- if (this.reverseCoders.containsKey(reverseKey)) {
- List<ProtocolCoderToolset> toolsets =
- this.reverseCoders.get(reverseKey);
- Iterator<ProtocolCoderToolset> toolsetsIter =
- toolsets.iterator();
- while (toolsetsIter.hasNext()) {
- ProtocolCoderToolset toolset = toolsetsIter.next();
- if (toolset.getControllerId().equals(key)) {
- logger.info(
- "{}: removed coder from toolset for {} from reverse mapping", this, reverseKey);
- toolsetsIter.remove();
- }
- }
-
- if (this.reverseCoders.get(reverseKey).isEmpty()) {
- logger.info("{}: removing reverse mapping for {}: ", this, reverseKey);
- this.reverseCoders.remove(reverseKey);
- }
- }
+ removeReverseCoder(key, reverseKey);
}
}
}
}
+ private void removeReverseCoder(String key, String reverseKey) {
+ if (!this.reverseCoders.containsKey(reverseKey)) {
+ return;
+ }
+
+ List<ProtocolCoderToolset> toolsets =
+ this.reverseCoders.get(reverseKey);
+ Iterator<ProtocolCoderToolset> toolsetsIter =
+ toolsets.iterator();
+ while (toolsetsIter.hasNext()) {
+ ProtocolCoderToolset toolset = toolsetsIter.next();
+ if (toolset.getControllerId().equals(key)) {
+ logger.info(
+ "{}: removed coder from toolset for {} from reverse mapping", this, reverseKey);
+ toolsetsIter.remove();
+ }
+ }
+
+ if (this.reverseCoders.get(reverseKey).isEmpty()) {
+ logger.info("{}: removing reverse mapping for {}: ", this, reverseKey);
+ this.reverseCoders.remove(reverseKey);
+ }
+ }
+
/**
* does it support coding.
*
@@ -446,20 +456,7 @@ abstract class GenericEventProtocolCoder {
}
for (ProtocolCoderToolset encoderSet : toolsets) {
- // figure out the right toolset
- String groupId = encoderSet.getGroupId();
- String artifactId = encoderSet.getArtifactId();
- List<CoderFilters> coderFilters = encoderSet.getCoders();
- for (CoderFilters coder : coderFilters) {
- if (coder.getCodedClass().equals(encodedClass.getClass().getName())) {
- DroolsController droolsController =
- DroolsControllerConstants.getFactory().get(groupId, artifactId, "");
- if (droolsController.ownsCoder(
- encodedClass.getClass(), coder.getModelClassLoaderHash())) {
- droolsControllers.add(droolsController);
- }
- }
- }
+ addToolsetControllers(droolsControllers, encodedClass, encoderSet);
}
if (droolsControllers.isEmpty()) {
@@ -473,6 +470,24 @@ abstract class GenericEventProtocolCoder {
return droolsControllers;
}
+ private void addToolsetControllers(List<DroolsController> droolsControllers, Object encodedClass,
+ ProtocolCoderToolset encoderSet) {
+ // figure out the right toolset
+ String groupId = encoderSet.getGroupId();
+ String artifactId = encoderSet.getArtifactId();
+ List<CoderFilters> coderFilters = encoderSet.getCoders();
+ for (CoderFilters coder : coderFilters) {
+ if (coder.getCodedClass().equals(encodedClass.getClass().getName())) {
+ DroolsController droolsController =
+ DroolsControllerConstants.getFactory().get(groupId, artifactId, "");
+ if (droolsController.ownsCoder(
+ encodedClass.getClass(), coder.getModelClassLoaderHash())) {
+ droolsControllers.add(droolsController);
+ }
+ }
+ }
+ }
+
/**
* get all filters by maven coordinates and topic.
*
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java
index 17247f41..82cd015e 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java
@@ -48,12 +48,12 @@ public interface PolicyController extends Startable, Lockable {
/**
* Get the topic readers of interest for this controller.
*/
- List<? extends TopicSource> getTopicSources();
+ List<TopicSource> getTopicSources();
/**
* Get the topic readers of interest for this controller.
*/
- List<? extends TopicSink> getTopicSinks();
+ List<TopicSink> getTopicSinks();
/**
* Get the Drools Controller.
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
index 36d8ca59..32e3f674 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
@@ -24,6 +24,7 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME;
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT;
+import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.Gson;
@@ -31,6 +32,11 @@ import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import lombok.Getter;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
@@ -43,6 +49,7 @@ import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInst
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.onap.policy.common.gson.annotation.GsonJsonProperty;
+import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.core.PolicyContainer;
@@ -89,11 +96,13 @@ class PolicyEngineManager implements PolicyEngine {
/**
* Is the Policy Engine running.
*/
+ @Getter
private volatile boolean alive = false;
/**
* Is the engine locked.
*/
+ @Getter
private volatile boolean locked = false;
/**
@@ -109,16 +118,19 @@ class PolicyEngineManager implements PolicyEngine {
/**
* Policy Engine Sources.
*/
- private List<? extends TopicSource> sources = new ArrayList<>();
+ @Getter
+ private List<TopicSource> sources = new ArrayList<>();
/**
* Policy Engine Sinks.
*/
- private List<? extends TopicSink> sinks = new ArrayList<>();
+ @Getter
+ private List<TopicSink> sinks = new ArrayList<>();
/**
* Policy Engine HTTP Servers.
*/
+ @Getter
private List<HttpServletServer> httpServers = new ArrayList<>();
/**
@@ -130,15 +142,11 @@ class PolicyEngineManager implements PolicyEngine {
@Override
public synchronized void boot(String[] cliArgs) {
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeBoot(this, cliArgs)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-boot failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeBoot(this, cliArgs),
+ (feature, ex) -> logger.error("{}: feature {} before-boot failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
try {
@@ -147,16 +155,10 @@ class PolicyEngineManager implements PolicyEngine {
logger.error("{}: cannot init policy-container because of {}", this, e.getMessage(), e);
}
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterBoot(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-boot failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterBoot(this),
+ (feature, ex) -> logger.error("{}: feature {} after-boot failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -220,15 +222,11 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch pre configure hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeConfigure(this, properties)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-configure failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeConfigure(this, properties),
+ (feature, ex) -> logger.error("{}: feature {} before-configure failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.properties = properties;
@@ -260,16 +258,10 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch post configure hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterConfigure(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-configure failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterConfigure(this),
+ (feature, ex) -> logger.error("{}: feature {} after-configure failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -337,16 +329,11 @@ class PolicyEngineManager implements PolicyEngine {
}
// feature hook
- for (final PolicyControllerFeatureApi controllerFeature : getControllerProviders()) {
- try {
- if (controllerFeature.afterCreate(controller)) {
- return controller;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-controller-create failure because of {}", this,
- controllerFeature.getClass().getName(), e.getMessage(), e);
- }
- }
+ PolicyController controller2 = controller;
+ FeatureApiUtils.apply(getControllerProviders(),
+ feature -> feature.afterCreate(controller2),
+ (feature, ex) -> logger.error("{}: feature {} after-controller-create failure because of {}",
+ this, feature.getClass().getName(), ex.getMessage(), ex));
return controller;
}
@@ -393,7 +380,6 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException("No controller configuration has been provided");
}
- PolicyController policyController = null;
try {
final String operation = configController.getOperation();
if (operation == null || operation.isEmpty()) {
@@ -401,75 +387,14 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException("operation must be provided");
}
- try {
- policyController = getControllerFactory().get(controllerName);
- } catch (final IllegalArgumentException e) {
- // not found
- logger.warn("Policy Controller " + controllerName + " not found", e);
- }
-
+ PolicyController policyController = getController(controllerName);
if (policyController == null) {
-
- if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK)
- || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
- throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
- }
-
- /* Recovery case */
-
- logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName);
-
- final Properties controllerProperties =
- getPersistenceManager().getControllerProperties(controllerName);
-
- /*
- * returned properties cannot be null (per implementation) assert (properties !=
- * null)
- */
-
- if (controllerProperties == null) {
- throw new IllegalArgumentException(controllerName + " is invalid");
- }
-
- logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless",
- controllerName);
-
- /*
- * try to bring up bad controller in brainless mode, after having it
- * working, apply the new create/update operation.
- */
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID,
- DroolsControllerConstants.NO_GROUP_ID);
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
- DroolsControllerConstants.NO_ARTIFACT_ID);
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION,
- DroolsControllerConstants.NO_VERSION);
-
- policyController = getPolicyEngine().createPolicyController(controllerName, controllerProperties);
+ policyController = findController(controllerName, operation);
/* fall through to do brain update operation */
}
- switch (operation) {
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
- getControllerFactory().patch(policyController, configController.getDrools());
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
- policyController.unlock();
- getControllerFactory().patch(policyController, configController.getDrools());
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
- policyController.lock();
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
- policyController.unlock();
- break;
- default:
- final String msg = "Controller Operation Configuration is not supported: " + operation + " for "
- + controllerName;
- logger.warn(msg);
- throw new IllegalArgumentException(msg);
- }
+ updateController(controllerName, policyController, operation, configController);
return policyController;
} catch (final Exception e) {
@@ -481,84 +406,135 @@ class PolicyEngineManager implements PolicyEngine {
}
}
+ private PolicyController getController(final String controllerName) {
+ PolicyController policyController = null;
+ try {
+ policyController = getControllerFactory().get(controllerName);
+ } catch (final IllegalArgumentException e) {
+ // not found
+ logger.warn("Policy Controller " + controllerName + " not found", e);
+ }
+ return policyController;
+ }
+
+ private PolicyController findController(final String controllerName, final String operation) {
+ if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK)
+ || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
+ throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
+ }
+
+ /* Recovery case */
+
+ logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName);
+
+ final Properties controllerProperties =
+ getPersistenceManager().getControllerProperties(controllerName);
+
+ /*
+ * returned properties cannot be null (per implementation) assert (properties !=
+ * null)
+ */
+
+ if (controllerProperties == null) {
+ throw new IllegalArgumentException(controllerName + " is invalid");
+ }
+
+ logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless",
+ controllerName);
+
+ /*
+ * try to bring up bad controller in brainless mode, after having it
+ * working, apply the new create/update operation.
+ */
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID,
+ DroolsControllerConstants.NO_GROUP_ID);
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
+ DroolsControllerConstants.NO_ARTIFACT_ID);
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION,
+ DroolsControllerConstants.NO_VERSION);
+
+ return getPolicyEngine().createPolicyController(controllerName, controllerProperties);
+ }
+
+ private void updateController(final String controllerName, PolicyController policyController,
+ final String operation, ControllerConfiguration configController) {
+ switch (operation) {
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
+ getControllerFactory().patch(policyController, configController.getDrools());
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
+ policyController.unlock();
+ getControllerFactory().patch(policyController, configController.getDrools());
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
+ policyController.lock();
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
+ policyController.unlock();
+ break;
+ default:
+ final String msg = "Controller Operation Configuration is not supported: " + operation + " for "
+ + controllerName;
+ logger.warn(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
@Override
public synchronized boolean start() {
/* policy-engine dispatch pre start hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeStart(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeStart(this),
+ (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
- boolean success = true;
if (this.locked) {
throw new IllegalStateException(ENGINE_LOCKED_MSG);
}
this.alive = true;
+ AtomicReference<Boolean> success = new AtomicReference<>(true);
+
/* Start Policy Engine exclusively-owned (unmanaged) http servers */
- for (final HttpServletServer httpServer : this.httpServers) {
- try {
- if (!httpServer.waitedStart(10 * 1000L)) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e);
- }
- }
+ attempt(success, this.httpServers,
+ httpServer -> httpServer.waitedStart(10 * 1000L),
+ (item, ex) -> logger.error("{}: cannot start http-server {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Engine exclusively-owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- if (!source.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ attempt(success, this.sources,
+ TopicSource::start,
+ (item, ex) -> logger.error("{}: cannot start topic-source {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- if (!sink.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ attempt(success, this.sinks,
+ TopicSink::start,
+ (item, ex) -> logger.error("{}: cannot start topic-sink {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Controllers */
- final List<PolicyController> controllers = getControllerFactory().inventory();
- for (final PolicyController controller : controllers) {
- try {
- if (!controller.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start policy-controller {} because of {}", this, controller, e.getMessage(),
- e);
- success = false;
- }
- }
+ attempt(success, getControllerFactory().inventory(),
+ PolicyController::start,
+ (item, ex) -> {
+ logger.error("{}: cannot start policy-controller {} because of {}", this, item,
+ ex.getMessage(), ex);
+ success.set(false);
+ });
/* Start managed Topic Endpoints */
try {
if (!getTopicEndpointManager().start()) {
- success = false;
+ success.set(false);
}
} catch (final IllegalStateException e) {
logger.warn("{}: Topic Endpoint Manager is in an invalid state because of {}", this, e.getMessage(), e);
@@ -570,109 +546,80 @@ class PolicyEngineManager implements PolicyEngine {
startPdpJmxListener();
/* policy-engine dispatch after start hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterStart(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterStart(this),
+ (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
- return success;
+ return success.get();
+ }
+
+ @FunctionalInterface
+ private static interface PredicateWithEx<T> {
+ public boolean test(T value) throws InterruptedException;
}
@Override
public synchronized boolean stop() {
/* policy-engine dispatch pre stop hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeStop(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeStop(this),
+ (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
/* stop regardless of the lock state */
- boolean success = true;
if (!this.alive) {
return true;
}
this.alive = false;
- final List<PolicyController> controllers = getControllerFactory().inventory();
- for (final PolicyController controller : controllers) {
- try {
- if (!controller.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot stop policy-controller {} because of {}", this, controller, e.getMessage(), e);
- success = false;
- }
- }
+ AtomicReference<Boolean> success = new AtomicReference<>(true);
+
+ attempt(success, getControllerFactory().inventory(),
+ PolicyController::stop,
+ (item, ex) -> {
+ logger.error("{}: cannot stop policy-controller {} because of {}", this, item,
+ ex.getMessage(), ex);
+ success.set(false);
+ });
/* Stop Policy Engine owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- if (!source.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ attempt(success, this.sources,
+ TopicSource::stop,
+ (item, ex) -> logger.error("{}: cannot stop topic-source {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Stop Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- if (!sink.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ attempt(success, this.sinks,
+ TopicSink::stop,
+ (item, ex) -> logger.error("{}: cannot stop topic-sink {} because of {}", this, item,
+ ex.getMessage(), ex));
/* stop all managed topics sources and sinks */
if (!getTopicEndpointManager().stop()) {
- success = false;
+ success.set(false);
}
/* stop all unmanaged http servers */
- for (final HttpServletServer httpServer : this.httpServers) {
- try {
- if (!httpServer.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e);
- }
- }
+ attempt(success, this.httpServers,
+ HttpServletServer::stop,
+ (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item,
+ ex.getMessage(), ex));
// stop JMX?
/* policy-engine dispatch pre stop hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterStop(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterStop(this),
+ (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
- return success;
+ return success.get();
}
@Override
@@ -687,36 +634,26 @@ class PolicyEngineManager implements PolicyEngine {
exitThread.start();
/* policy-engine dispatch pre shutdown hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeShutdown(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.alive = false;
/* Shutdown Policy Engine owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- source.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ applyAll(this.sources,
+ TopicSource::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown topic-source {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Shutdown Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- sink.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ applyAll(this.sinks,
+ TopicSink::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown topic-sink {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Shutdown managed resources */
getControllerFactory().shutdown();
@@ -728,19 +665,45 @@ class PolicyEngineManager implements PolicyEngine {
stopPdpJmxListener();
/* policy-engine dispatch post shutdown hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
+
+ exitThread.interrupt();
+ logger.info("{}: normal termination", this);
+ }
+
+ private <T> void attempt(AtomicReference<Boolean> success, List<T> items, PredicateWithEx<T> pred,
+ BiConsumer<T,Exception> handleEx) {
+
+ for (T item : items) {
try {
- if (feature.afterShutdown(this)) {
- return;
+ if (!pred.test(item)) {
+ success.set(false);
}
- } catch (final Exception e) {
- logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
+
+ } catch (InterruptedException ex) {
+ handleEx.accept(item, ex);
+ Thread.currentThread().interrupt();
+
+ } catch (RuntimeException ex) {
+ handleEx.accept(item, ex);
}
}
+ }
- exitThread.interrupt();
- logger.info("{}: normal termination", this);
+ private <T> void applyAll(List<T> items, Consumer<T> function,
+ BiConsumer<T,Exception> handleEx) {
+
+ for (T item : items) {
+ try {
+ function.accept(item);
+
+ } catch (RuntimeException ex) {
+ handleEx.accept(item, ex);
+ }
+ }
}
/**
@@ -764,14 +727,10 @@ class PolicyEngineManager implements PolicyEngine {
/*
* shut down the Policy Engine owned http servers as the very last thing
*/
- for (final HttpServletServer httpServer : PolicyEngineManager.this.getHttpServers()) {
- try {
- httpServer.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown http-server {} because of {}", PolicyEngineManager.this,
- httpServer, e.getMessage(), e);
- }
- }
+ applyAll(PolicyEngineManager.this.getHttpServers(),
+ HttpServletServer::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown http-server {} because of {}", this, item,
+ ex.getMessage(), ex));
logger.info("{}: exit", PolicyEngineManager.this);
doExit(0);
@@ -790,23 +749,14 @@ class PolicyEngineManager implements PolicyEngine {
}
@Override
- public boolean isAlive() {
- return this.alive;
- }
-
- @Override
public synchronized boolean lock() {
/* policy-engine dispatch pre lock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeLock(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeLock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (this.locked) {
@@ -829,16 +779,10 @@ class PolicyEngineManager implements PolicyEngine {
success = getTopicEndpointManager().lock() && success;
/* policy-engine dispatch post lock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterLock(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterLock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -847,15 +791,11 @@ class PolicyEngineManager implements PolicyEngine {
public synchronized boolean unlock() {
/* policy-engine dispatch pre unlock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeUnlock(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (!this.locked) {
@@ -879,26 +819,15 @@ class PolicyEngineManager implements PolicyEngine {
success = getTopicEndpointManager().unlock() && success;
/* policy-engine dispatch after unlock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterUnlock(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@Override
- public boolean isLocked() {
- return this.locked;
- }
-
- @Override
public void removePolicyController(String name) {
getControllerFactory().destroy(name);
}
@@ -933,24 +862,6 @@ class PolicyEngineManager implements PolicyEngine {
return this.properties;
}
-
- @SuppressWarnings("unchecked")
- @Override
- public List<TopicSource> getSources() {
- return (List<TopicSource>) this.sources;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public List<TopicSink> getSinks() {
- return (List<TopicSink>) this.sinks;
- }
-
- @Override
- public List<HttpServletServer> getHttpServers() {
- return this.httpServers;
- }
-
@Override
public List<String> getFeatures() {
final List<String> features = new ArrayList<>();
@@ -985,15 +896,12 @@ class PolicyEngineManager implements PolicyEngine {
@Override
public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
/* policy-engine pre topic event hook */
- for (final PolicyEngineFeatureApi feature : getFeatureProviders()) {
- try {
- if (feature.beforeOnTopicEvent(this, commType, topic, event)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this,
- feature.getClass().getName(), event, e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getFeatureProviders(),
+ feature -> feature.beforeOnTopicEvent(this, commType, topic, event),
+ (feature, ex) -> logger.error(
+ "{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this,
+ feature.getClass().getName(), event, ex.getMessage(), ex))) {
+ return;
}
/* configuration request */
@@ -1006,16 +914,11 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine after topic event hook */
- for (final PolicyEngineFeatureApi feature : getFeatureProviders()) {
- try {
- if (feature.afterOnTopicEvent(this, configuration, commType, topic, event)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this,
- feature.getClass().getName(), event, e.getMessage(), e);
- }
- }
+ PdpdConfiguration configuration2 = configuration;
+ FeatureApiUtils.apply(getFeatureProviders(),
+ feature -> feature.afterOnTopicEvent(this, configuration2, commType, topic, event),
+ (feature, ex) -> logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this,
+ feature.getClass().getName(), event, ex.getMessage(), ex));
}
@Override
@@ -1041,7 +944,7 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalStateException(ENGINE_LOCKED_MSG);
}
- final List<? extends TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic);
+ final List<TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic);
if (topicSinks == null || topicSinks.size() != 1) {
throw new IllegalStateException("Cannot ensure correct delivery on topic " + topic + ": " + topicSinks);
}
@@ -1056,11 +959,11 @@ class PolicyEngineManager implements PolicyEngine {
* Note this entry point is usually from the DRL (one of the reasons busType is String.
*/
- if (busType == null || busType.isEmpty()) {
+ if (StringUtils.isBlank(busType)) {
throw new IllegalArgumentException("Invalid Communication Infrastructure");
}
- if (topic == null || topic.isEmpty()) {
+ if (StringUtils.isBlank(topic)) {
throw new IllegalArgumentException(INVALID_TOPIC_MSG);
}
@@ -1068,12 +971,8 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException(INVALID_EVENT_MSG);
}
- boolean valid = false;
- for (final Topic.CommInfrastructure comm : Topic.CommInfrastructure.values()) {
- if (comm.name().equals(busType)) {
- valid = true;
- }
- }
+ boolean valid = Stream.of(Topic.CommInfrastructure.values()).map(Enum::name)
+ .anyMatch(name -> name.equals(busType));
if (!valid) {
throw new IllegalArgumentException("Invalid Communication Infrastructure: " + busType);
@@ -1181,15 +1080,11 @@ class PolicyEngineManager implements PolicyEngine {
public synchronized void activate() {
/* policy-engine dispatch pre activate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeActivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-activate failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeActivate(this),
+ (feature, ex) -> logger.error("{}: feature {} before-activate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
// activate 'policy-management'
@@ -1209,31 +1104,21 @@ class PolicyEngineManager implements PolicyEngine {
this.unlock();
/* policy-engine dispatch post activate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterActivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-activate failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterActivate(this),
+ (feature, ex) -> logger.error("{}: feature {} after-activate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
public synchronized void deactivate() {
/* policy-engine dispatch pre deactivate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeDeactivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-deactivate failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeDeactivate(this),
+ (feature, ex) -> logger.error("{}: feature {} before-deactivate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.lock();
@@ -1248,16 +1133,10 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch post deactivate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterDeactivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-deactivate failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterDeactivate(this),
+ (feature, ex) -> logger.error("{}: feature {} after-deactivate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
private boolean controllerConfig(PdpdConfiguration config) {
@@ -1269,12 +1148,8 @@ class PolicyEngineManager implements PolicyEngine {
}
final List<PolicyController> policyControllers = this.updatePolicyControllers(config.getControllers());
- boolean success = false;
- if (!(policyControllers == null || policyControllers.isEmpty())
- && (policyControllers.size() == configControllers.size())) {
- success = true;
- }
- return success;
+ return (policyControllers != null && !policyControllers.isEmpty()
+ && policyControllers.size() == configControllers.size());
}
@Override
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
index 5d915104..aa57abaf 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
@@ -34,6 +34,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
+import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.controller.DroolsControllerFactory;
@@ -54,6 +55,9 @@ import org.slf4j.LoggerFactory;
*/
public class AggregatedPolicyController implements PolicyController, TopicListener {
+ private static final String BEFORE_OFFER_FAILURE = "{}: feature {} before-offer failure because of {}";
+ private static final String AFTER_OFFER_FAILURE = "{}: feature {} after-offer failure because of {}";
+
/**
* Logger.
*/
@@ -67,12 +71,12 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
/**
* Abstracted Event Sources List regardless communication technology.
*/
- private final List<? extends TopicSource> sources;
+ private final List<TopicSource> sources;
/**
* Abstracted Event Sinks List regardless communication technology.
*/
- private final List<? extends TopicSink> sinks;
+ private final List<TopicSink> sinks;
/**
* Mapping topics to sinks.
@@ -273,15 +277,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public boolean start() {
logger.info("{}: start", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeStart(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeStart(this),
+ (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (this.isLocked()) {
@@ -312,16 +312,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
}
}
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterStart(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterStart(this),
+ (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -333,15 +327,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public boolean stop() {
logger.info("{}: stop", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeStop(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeStop(this),
+ (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
/* stop regardless locked state */
@@ -362,16 +352,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.stop();
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterStop(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterStop(this),
+ (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -383,31 +367,21 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public void shutdown() {
logger.info("{}: shutdown", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeShutdown(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.stop();
getDroolsFactory().shutdown(this.droolsController);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterShutdown(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
/**
@@ -417,31 +391,21 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public void halt() {
logger.info("{}: halt", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeHalt(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-halt failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeHalt(this),
+ (feature, ex) -> logger.error("{}: feature {} before-halt failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.stop();
getDroolsFactory().destroy(this.droolsController);
getPersistenceManager().deleteController(this.name);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterHalt(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-halt failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterHalt(this),
+ (feature, ex) -> logger.error("{}: feature {} after-halt failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
/**
@@ -455,29 +419,19 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
return;
}
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeOffer(this, commType, topic, event)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeOffer(this, commType, topic, event),
+ (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
boolean success = this.droolsController.offer(topic, event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterOffer(this, commType, topic, event, success)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterOffer(this, commType, topic, event, success),
+ (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -488,29 +442,19 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
return true;
}
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeOffer(this, event)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeOffer(this, event),
+ (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
boolean success = this.droolsController.offer(event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterOffer(this, event, success)) {
- return success;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterOffer(this, event, success),
+ (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -527,15 +471,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
logger.debug("{}: deliver event to {}:{}: {}", this, commType, topic, event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeDeliver(this, commType, topic, event)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-deliver failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeDeliver(this, commType, topic, event),
+ (feature, ex) -> logger.error("{}: feature {} before-deliver failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (topic == null || topic.isEmpty()) {
@@ -562,16 +502,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.deliver(this.topic2Sinks.get(topic), event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterDeliver(this, commType, topic, event, success)) {
- return success;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-deliver failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterDeliver(this, commType, topic, event, success),
+ (feature, ex) -> logger.error("{}: feature {} after-deliver failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -591,15 +525,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public boolean lock() {
logger.info("{}: lock", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeLock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeLock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
synchronized (this) {
@@ -615,16 +545,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.lock();
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterLock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterLock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -637,15 +561,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
logger.info("{}: unlock", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeUnlock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
synchronized (this) {
@@ -658,16 +578,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.unlock();
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterUnlock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -684,7 +598,7 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
* {@inheritDoc}.
*/
@Override
- public List<? extends TopicSource> getTopicSources() {
+ public List<TopicSource> getTopicSources() {
return this.sources;
}
@@ -692,7 +606,7 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
* {@inheritDoc}.
*/
@Override
- public List<? extends TopicSink> getTopicSinks() {
+ public List<TopicSink> getTopicSinks() {
return this.sinks;
}