summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java27
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java59
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java4
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java2
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java8
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java12
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java114
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java2
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java4
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java2
10 files changed, 115 insertions, 119 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index eb41f803..8780eefc 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -21,11 +21,13 @@
package org.onap.policy.drools.pooling;
import java.util.List;
-import org.onap.policy.drools.event.comm.FilterableTopicSource;
-import org.onap.policy.drools.event.comm.TopicEndpoint;
-import org.onap.policy.drools.event.comm.TopicListener;
-import org.onap.policy.drools.event.comm.TopicSink;
-import org.onap.policy.drools.event.comm.TopicSource;
+
+import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,8 +99,7 @@ public class DmaapManager {
}
/**
- * Used by junit tests to set the factory used to create various objects used by this
- * class.
+ * Used by junit tests to set the factory used to create various objects used by this class.
*
* @param factory the new factory
*/
@@ -162,8 +163,8 @@ public class DmaapManager {
/**
* Stops the publisher.
*
- * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued
- * messages and close
+ * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued messages and
+ * close
*/
public void stopPublisher(long waitMs) {
if (!publishing) {
@@ -171,8 +172,8 @@ public class DmaapManager {
}
/*
- * Give the sink a chance to transmit messages in the queue. It would be better if
- * "waitMs" could be passed to sink.stop(), but that isn't an option at this time.
+ * Give the sink a chance to transmit messages in the queue. It would be better if "waitMs"
+ * could be passed to sink.stop(), but that isn't an option at this time.
*/
try {
Thread.sleep(waitMs);
@@ -262,14 +263,14 @@ public class DmaapManager {
* @return the topic sources
*/
public List<TopicSource> getTopicSources() {
- return TopicEndpoint.manager.getTopicSources();
+ return ProxyTopicEndpointManager.getInstance().getTopicSources();
}
/**
* @return the topic sinks
*/
public List<TopicSink> getTopicSinks() {
- return TopicEndpoint.manager.getTopicSinks();
+ return ProxyTopicEndpointManager.getInstance().getTopicSinks();
}
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index bf35bcf5..815dc548 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -25,13 +25,14 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
import org.onap.policy.common.utils.properties.SpecProperties;
import org.onap.policy.common.utils.properties.exception.PropertyException;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.event.comm.TopicEndpoint;
-import org.onap.policy.drools.event.comm.TopicSink;
-import org.onap.policy.drools.event.comm.TopicSource;
-import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
@@ -44,11 +45,10 @@ import org.slf4j.LoggerFactory;
/**
* Controller/session pooling. Multiple hosts may be launched, all servicing the same
- * controllers/sessions. When this feature is enabled, the requests are divided across the
- * different hosts, instead of all running on a single, active host.
- * <p>
- * With each controller, there is an associated DMaaP topic that is used for internal
- * communication between the different hosts serving the controller.
+ * controllers/sessions. When this feature is enabled, the requests are divided across the different
+ * hosts, instead of all running on a single, active host. <p> With each controller, there is an
+ * associated DMaaP topic that is used for internal communication between the different hosts
+ * serving the controller.
*/
public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerFeatureAPI, DroolsControllerFeatureAPI {
@@ -80,9 +80,9 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
private final CountDownLatch activeLatch = new CountDownLatch(1);
/**
- * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is
- * called later. As multiple threads can be active within the methods at the same
- * time, we must keep this in thread local storage.
+ * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is called
+ * later. As multiple threads can be active within the methods at the same time, we must keep
+ * this in thread local storage.
*/
private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
@@ -128,13 +128,13 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
public boolean beforeStart(PolicyEngine engine) {
logger.info("initializing " + PoolingProperties.FEATURE_NAME);
featProps = factory.getProperties(PoolingProperties.FEATURE_NAME);
-
+
// remove any generic pooling topic - always use controller-specific property
featProps.remove(PoolingProperties.POOLING_TOPIC);
-
+
factory.initTopicSources(featProps);
factory.initTopicSinks(featProps);
-
+
return false;
}
@@ -154,7 +154,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
String name = controller.getName();
SpecProperties specProps = new SpecProperties(PoolingProperties.PREFIX, name, featProps);
-
+
if (FeatureEnabledChecker.isFeatureEnabled(specProps, PoolingProperties.FEATURE_ENABLED)) {
try {
// get & validate the properties
@@ -239,8 +239,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
@Override
public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) {
/*
- * As this is invoked a lot, we'll directly call the manager's method instead of
- * using the functional interface via doManager().
+ * As this is invoked a lot, we'll directly call the manager's method instead of using the
+ * functional interface via doManager().
*/
PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
if (mgr == null) {
@@ -270,20 +270,20 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
} catch (IllegalArgumentException | IllegalStateException e) {
logger.warn("cannot get controller for {} {}", droolsController.getGroupId(),
- droolsController.getArtifactId(), e);
+ droolsController.getArtifactId(), e);
return false;
}
if (controller == null) {
logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(),
- droolsController.getArtifactId());
+ droolsController.getArtifactId());
return false;
}
/*
- * As this is invoked a lot, we'll directly call the manager's method instead of
- * using the functional interface via doManager().
+ * As this is invoked a lot, we'll directly call the manager's method instead of using the
+ * functional interface via doManager().
*/
PoolingManagerImpl mgr = ctlr2pool.get(controller.getName());
if (mgr == null) {
@@ -295,7 +295,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
@Override
public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event,
- boolean success) {
+ boolean success) {
// clear any stored arguments
offerArgs.set(null);
@@ -304,8 +304,8 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
}
/**
- * Executes a function using the manager associated with the controller. Catches any
- * exceptions from the function and re-throws it as a runtime exception.
+ * Executes a function using the manager associated with the controller. Catches any exceptions
+ * from the function and re-throws it as a runtime exception.
*
* @param controller
* @param func function to be executed
@@ -349,8 +349,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
/**
*
* @param mgr
- * @return {@code true} if the request was handled by the manager, {@code false}
- * otherwise
+ * @return {@code true} if the request was handled by the manager, {@code false} otherwise
* @throws PoolingFeatureException
*/
public boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
@@ -412,7 +411,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
* @return a new pooling manager
*/
public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
- CountDownLatch activeLatch) {
+ CountDownLatch activeLatch) {
return new PoolingManagerImpl(host, controller, props, activeLatch);
}
@@ -433,7 +432,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
* @return the topic sources
*/
public List<TopicSource> initTopicSources(Properties props) {
- return TopicEndpoint.manager.addTopicSources(props);
+ return ProxyTopicEndpointManager.getInstance().addTopicSources(props);
}
/**
@@ -443,7 +442,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
* @return the topic sinks
*/
public List<TopicSink> initTopicSinks(Properties props) {
- return TopicEndpoint.manager.addTopicSinks(props);
+ return ProxyTopicEndpointManager.getInstance().addTopicSinks(props);
}
}
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index 68dfee14..02ba4ec9 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -31,8 +31,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.onap.policy.common.utils.properties.SpecProperties;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.drools.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.drools.pooling.extractor.ClassExtractors;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Forward;
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
index 6122d361..fb3d4eb2 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
@@ -20,7 +20,7 @@
package org.onap.policy.drools.pooling.message;
-import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.pooling.PoolingFeatureException;
import com.fasterxml.jackson.annotation.JsonIgnore;
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
index 6509e90e..d48dea5b 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
@@ -37,10 +37,10 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.onap.policy.drools.event.comm.FilterableTopicSource;
-import org.onap.policy.drools.event.comm.TopicListener;
-import org.onap.policy.drools.event.comm.TopicSink;
-import org.onap.policy.drools.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.drools.pooling.DmaapManager.Factory;
public class DmaapManagerTest {
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
index 8683103c..3c3466be 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
@@ -52,12 +52,12 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.event.comm.FilterableTopicSource;
-import org.onap.policy.drools.event.comm.Topic;
-import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.drools.event.comm.TopicListener;
-import org.onap.policy.drools.event.comm.TopicSink;
-import org.onap.policy.drools.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
+import org.onap.policy.common.endpoints.event.comm.Topic;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.system.PolicyController;
import org.onap.policy.drools.system.PolicyEngine;
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
index 6280ebed..f25f3d3d 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
@@ -27,6 +27,10 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Deque;
@@ -39,6 +43,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -47,36 +52,26 @@ import org.junit.Ignore;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.drools.event.comm.TopicEndpoint;
-import org.onap.policy.drools.event.comm.TopicListener;
-import org.onap.policy.drools.event.comm.TopicSink;
-import org.onap.policy.drools.event.comm.TopicSource;
-import org.onap.policy.drools.properties.PolicyProperties;
import org.onap.policy.drools.system.PolicyController;
import org.onap.policy.drools.system.PolicyEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
/**
- * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
- * its own feature object. Uses real feature objects, as well as real DMaaP sources and
- * sinks. However, the following are not:
- * <dl>
- * <dt>PolicyEngine, PolicyController, DroolsController</dt>
- * <dd>mocked</dd>
+ * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having its own
+ * feature object. Uses real feature objects, as well as real DMaaP sources and sinks. However, the
+ * following are not: <dl> <dt>PolicyEngine, PolicyController, DroolsController</dt> <dd>mocked</dd>
* </dl>
*
- * <p>
- * The following fields must be set before executing this:
- * <ul>
- * <li>UEB_SERVERS</li>
- * <li>INTERNAL_TOPIC</li>
- * <li>EXTERNAL_TOPIC</li>
- * </ul>
+ * <p> The following fields must be set before executing this: <ul> <li>UEB_SERVERS</li>
+ * <li>INTERNAL_TOPIC</li> <li>EXTERNAL_TOPIC</li> </ul>
*/
public class FeatureTest2 {
@@ -147,10 +142,10 @@ public class FeatureTest2 {
saveManagerFactory = PoolingManagerImpl.getFactory();
saveDmaapFactory = DmaapManager.getFactory();
- externalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
+ externalSink = ProxyTopicEndpointManager.getInstance().addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
externalSink.start();
- internalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
+ internalSink = ProxyTopicEndpointManager.getInstance().addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
internalSink.start();
}
@@ -222,14 +217,15 @@ public class FeatureTest2 {
private static Properties makeSinkProperties(String topic) {
Properties props = new Properties();
- props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS, topic);
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS, topic);
- props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
- props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
- props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false");
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
+ props.setProperty(
+ PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX,
+ "false");
return props;
}
@@ -237,21 +233,20 @@ public class FeatureTest2 {
private static Properties makeSourceProperties(String topic) {
Properties props = new Properties();
- props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
- props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
- props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
- props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false");
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
+ props.setProperty(
+ PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX,
+ "false");
if (EXTERNAL_TOPIC.equals(topic)) {
// consumer group is a constant
- props.setProperty(
- PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
- + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX,
- EXTERNAL_GROUP);
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP);
// consumer instance is generated by the BusConsumer code
}
@@ -378,8 +373,8 @@ public class FeatureTest2 {
/**
* @param droolsController
- * @return the controller associated with a drools controller, or {@code null} if
- * it has no associated controller
+ * @return the controller associated with a drools controller, or {@code null} if it has no
+ * associated controller
*/
public PolicyController getController(DroolsController droolsController) {
return drools2policy.get(droolsController);
@@ -472,8 +467,10 @@ public class FeatureTest2 {
when(controller.getName()).thenReturn(CONTROLLER1);
when(controller.getDrools()).thenReturn(drools);
- externalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0);
- internalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0);
+ externalSource = ProxyTopicEndpointManager.getInstance()
+ .addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0);
+ internalSource = ProxyTopicEndpointManager.getInstance()
+ .addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0);
// stop consuming events if the controller stops
when(controller.stop()).thenAnswer(args -> {
@@ -490,8 +487,8 @@ public class FeatureTest2 {
* Waits, for a period of time, for the host to enter the Active state.
*
* @param timeMs time to wait, in milliseconds
- * @return {@code true} if the host entered the Active state within the given
- * amount of time, {@code false} otherwise
+ * @return {@code true} if the host entered the Active state within the given amount of
+ * time, {@code false} otherwise
* @throws InterruptedException
*/
public boolean awaitActive(long timeMs) throws InterruptedException {
@@ -499,8 +496,8 @@ public class FeatureTest2 {
}
/**
- * Starts threads for the host so that it begins consuming from both the external
- * "DMaaP" topic and its own internal "DMaaP" topic.
+ * Starts threads for the host so that it begins consuming from both the external "DMaaP"
+ * topic and its own internal "DMaaP" topic.
*/
public void start() {
DmaapManager.setFactory(new DmaapManager.Factory() {
@@ -592,8 +589,7 @@ public class FeatureTest2 {
/**
*
- * @return {@code true} if a message was seen for this host, {@code false}
- * otherwise
+ * @return {@code true} if a message was seen for this host, {@code false} otherwise
*/
public boolean messageSeen() {
return sawMsg.get();
@@ -664,8 +660,8 @@ public class FeatureTest2 {
this.context = context;
/*
- * Note: do NOT extract anything from "context" at this point, because it
- * hasn't been fully initialized yet
+ * Note: do NOT extract anything from "context" at this point, because it hasn't been
+ * fully initialized yet
*/
}
@@ -680,15 +676,15 @@ public class FeatureTest2 {
props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
- "" + STD_OFFLINE_PUB_WAIT_MS);
+ "" + STD_OFFLINE_PUB_WAIT_MS);
props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
- "" + STD_START_HEARTBEAT_MS);
+ "" + STD_START_HEARTBEAT_MS);
props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS);
props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS);
props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
- "" + STD_ACTIVE_HEARTBEAT_MS);
+ "" + STD_ACTIVE_HEARTBEAT_MS);
props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
- "" + STD_INTER_HEARTBEAT_MS);
+ "" + STD_INTER_HEARTBEAT_MS);
props.putAll(makeSinkProperties(INTERNAL_TOPIC));
props.putAll(makeSourceProperties(INTERNAL_TOPIC));
@@ -741,8 +737,8 @@ public class FeatureTest2 {
public ManagerFactory(Context context) {
/*
- * Note: do NOT extract anything from "context" at this point, because it
- * hasn't been fully initialized yet
+ * Note: do NOT extract anything from "context" at this point, because it hasn't been
+ * fully initialized yet
*/
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
index 32264e3a..c57a9f6f 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
@@ -39,7 +39,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.pooling.PoolingFeature.Factory;
import org.onap.policy.drools.system.PolicyController;
import org.onap.policy.drools.system.PolicyEngine;
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index d74b87f9..d90bac4b 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -47,8 +47,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.drools.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.drools.pooling.PoolingManagerImpl.Factory;
import org.onap.policy.drools.pooling.extractor.ClassExtractors;
import org.onap.policy.drools.pooling.message.BucketAssignments;
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
index bc92fa27..2549fa94 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
-import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
public class ForwardTest extends BasicMessageTester<Forward> {
// values set by makeValidMessage()