aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java59
1 files changed, 29 insertions, 30 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index bf35bcf5..815dc548 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -25,13 +25,14 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
import org.onap.policy.common.utils.properties.SpecProperties;
import org.onap.policy.common.utils.properties.exception.PropertyException;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.event.comm.TopicEndpoint;
-import org.onap.policy.drools.event.comm.TopicSink;
-import org.onap.policy.drools.event.comm.TopicSource;
-import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
@@ -44,11 +45,10 @@ import org.slf4j.LoggerFactory;
/**
* Controller/session pooling. Multiple hosts may be launched, all servicing the same
- * controllers/sessions. When this feature is enabled, the requests are divided across the
- * different hosts, instead of all running on a single, active host.
- * <p>
- * With each controller, there is an associated DMaaP topic that is used for internal
- * communication between the different hosts serving the controller.
+ * controllers/sessions. When this feature is enabled, the requests are divided across the different
+ * hosts, instead of all running on a single, active host. <p> With each controller, there is an
+ * associated DMaaP topic that is used for internal communication between the different hosts
+ * serving the controller.
*/
public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerFeatureAPI, DroolsControllerFeatureAPI {
@@ -80,9 +80,9 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
private final CountDownLatch activeLatch = new CountDownLatch(1);
/**
- * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is
- * called later. As multiple threads can be active within the methods at the same
- * time, we must keep this in thread local storage.
+ * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is called
+ * later. As multiple threads can be active within the methods at the same time, we must keep
+ * this in thread local storage.
*/
private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
@@ -128,13 +128,13 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
public boolean beforeStart(PolicyEngine engine) {
logger.info("initializing " + PoolingProperties.FEATURE_NAME);
featProps = factory.getProperties(PoolingProperties.FEATURE_NAME);
-
+
// remove any generic pooling topic - always use controller-specific property
featProps.remove(PoolingProperties.POOLING_TOPIC);
-
+
factory.initTopicSources(featProps);
factory.initTopicSinks(featProps);
-
+
return false;
}
@@ -154,7 +154,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
String name = controller.getName();
SpecProperties specProps = new SpecProperties(PoolingProperties.PREFIX, name, featProps);
-
+
if (FeatureEnabledChecker.isFeatureEnabled(specProps, PoolingProperties.FEATURE_ENABLED)) {
try {
// get & validate the properties
@@ -239,8 +239,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
@Override
public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
/*
- * As this is invoked a lot, we'll directly call the manager's method instead of
- * using the functional interface via doManager().
+ * As this is invoked a lot, we'll directly call the manager's method instead of using the
+ * functional interface via doManager().
*/
PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
if (mgr == null) {
@@ -270,20 +270,20 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
} catch (IllegalArgumentException | IllegalStateException e) {
logger.warn("cannot get controller for {} {}", droolsController.getGroupId(),
- droolsController.getArtifactId(), e);
+ droolsController.getArtifactId(), e);
return false;
}
if (controller == null) {
logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(),
- droolsController.getArtifactId());
+ droolsController.getArtifactId());
return false;
}
/*
- * As this is invoked a lot, we'll directly call the manager's method instead of
- * using the functional interface via doManager().
+ * As this is invoked a lot, we'll directly call the manager's method instead of using the
+ * functional interface via doManager().
*/
PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
if (mgr == null) {
@@ -295,7 +295,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
@Override
public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
- boolean success) {
+ boolean success) {
// clear any stored arguments
offerArgs.set(null);
@@ -304,8 +304,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
}
/**
- * Executes a function using the manager associated with the controller. Catches any
- * exceptions from the function and re-throws it as a runtime exception.
+ * Executes a function using the manager associated with the controller. Catches any exceptions
+ * from the function and re-throws it as a runtime exception.
*
* @param controller
* @param func function to be executed
@@ -349,8 +349,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
/**
*
* @param mgr
- * @return {@code true} if the request was handled by the manager, {@code false}
- * otherwise
+ * @return {@code true} if the request was handled by the manager, {@code false} otherwise
* @throws PoolingFeatureException
*/
public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
@@ -412,7 +411,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
* @return a new pooling manager
*/
public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
- CountDownLatch activeLatch) {
+ CountDownLatch activeLatch) {
return new PoolingManagerImpl(host, controller, props, activeLatch);
}
@@ -433,7 +432,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
* @return the topic sources
*/
public List<TopicSource> initTopicSources(Properties props) {
- return TopicEndpoint.manager.addTopicSources(props);
+ return ProxyTopicEndpointManager.getInstance().addTopicSources(props);
}
/**
@@ -443,7 +442,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
* @return the topic sinks
*/
public List<TopicSink> initTopicSinks(Properties props) {
- return TopicEndpoint.manager.addTopicSinks(props);
+ return ProxyTopicEndpointManager.getInstance().addTopicSinks(props);
}
}
}