summaryrefslogtreecommitdiffstats
path: root/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java')
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java56
1 files changed, 42 insertions, 14 deletions
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
index 32e3f674..766848c6 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
@@ -506,20 +506,6 @@ class PolicyEngineManager implements PolicyEngine {
(item, ex) -> logger.error("{}: cannot start http-server {} because of {}",
this, item, ex.getMessage(), ex));
- /* Start Policy Engine exclusively-owned (unmanaged) sources */
-
- attempt(success, this.sources,
- TopicSource::start,
- (item, ex) -> logger.error("{}: cannot start topic-source {} because of {}",
- this, item, ex.getMessage(), ex));
-
- /* Start Policy Engine owned (unmanaged) sinks */
-
- attempt(success, this.sinks,
- TopicSink::start,
- (item, ex) -> logger.error("{}: cannot start topic-sink {} because of {}",
- this, item, ex.getMessage(), ex));
-
/* Start Policy Controllers */
attempt(success, getControllerFactory().inventory(),
@@ -554,6 +540,48 @@ class PolicyEngineManager implements PolicyEngine {
return success.get();
}
+ @Override
+ public synchronized boolean open() {
+
+ /* pre-open hook */
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeOpen(this),
+ (feature, ex) -> logger.error("{}: feature {} before-open failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
+ }
+
+ if (this.locked) {
+ throw new IllegalStateException(ENGINE_LOCKED_MSG);
+ }
+
+ if (!this.alive) {
+ throw new IllegalStateException(ENGINE_STOPPED_MSG);
+ }
+
+ AtomicReference<Boolean> success = new AtomicReference<>(true);
+
+ /* Open the unmanaged topics to external components for configuration purposes */
+
+ attempt(success, this.sources,
+ TopicSource::start,
+ (item, ex) -> logger.error("{}: cannot start topic-source {} because of {}",
+ this, item, ex.getMessage(), ex));
+
+ attempt(success, this.sinks,
+ TopicSink::start,
+ (item, ex) -> logger.error("{}: cannot start topic-sink {} because of {}",
+ this, item, ex.getMessage(), ex));
+
+ /* post-open hook */
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterOpen(this),
+ (feature, ex) -> logger.error("{}: feature {} after-open failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
+
+ return success.get();
+ }
+
@FunctionalInterface
private static interface PredicateWithEx<T> {
public boolean test(T value) throws InterruptedException;