summaryrefslogtreecommitdiffstats
path: root/feature-pooling-messages
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-messages')
-rw-r--r--feature-pooling-messages/src/main/feature/config/feature-pooling-messages.properties26
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java2
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java4
-rw-r--r--feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java10
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java20
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java9
6 files changed, 35 insertions, 36 deletions
diff --git a/feature-pooling-messages/src/main/feature/config/feature-pooling-messages.properties b/feature-pooling-messages/src/main/feature/config/feature-pooling-messages.properties
index 925d1698..8497538e 100644
--- a/feature-pooling-messages/src/main/feature/config/feature-pooling-messages.properties
+++ b/feature-pooling-messages/src/main/feature/config/feature-pooling-messages.properties
@@ -72,18 +72,18 @@
# be enabled at the same time.
pooling.usecases.enabled=true
-pooling.usecases.topic=${env:POOLING_TOPIC}
+pooling.usecases.topic=${envd:POOLING_TOPIC}
# the list of sources and sinks should be identical
-kafka.source.topics=POOLING_TOPIC
-kafka.sink.topics=POOLING_TOPIC
-
-kafka.source.topics.POOLING_TOPIC.servers=${env:KAFKA_SERVERS}
-kafka.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-kafka.source.topics.POOLING_TOPIC.apiKey=
-kafka.source.topics.POOLING_TOPIC.apiSecret=
-
-kafka.sink.topics.POOLING_TOPIC.servers=${env:kafka_SERVERS}
-kafka.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-kafka.sink.topics.POOLING_TOPIC.apiKey=
-kafka.sink.topics.POOLING_TOPIC.apiSecret=
+kafka.source.topics=${envd:POOLING_TOPIC}
+kafka.sink.topics=${envd:POOLING_TOPIC}
+
+kafka.source.topics.policy-pdp-pooling.servers=${envd:KAFKA_SERVERS}
+kafka.source.topics.policy-pdp-pooling.effectiveTopic=${envd:POOLING_TOPIC}
+kafka.source.topics.policy-pdp-pooling.apiKey=
+kafka.source.topics.policy-pdp-pooling.apiSecret=
+
+kafka.sink.topics.policy-pdp-pooling.servers=${envd:KAFKA_SERVERS}
+kafka.sink.topics.policy-pdp-pooling.effectiveTopic=${envd:POOLING_TOPIC}
+kafka.sink.topics.policy-pdp-pooling.apiKey=
+kafka.sink.topics.policy-pdp-pooling.apiSecret=
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index 6411dd81..a1368d8c 100644
--- a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -52,7 +52,7 @@ import org.slf4j.LoggerFactory;
* hosts, instead of all running on a single, active host.
*
* <p>With each controller, there is an
- * associated DMaaP topic that is used for internal communication between the different hosts
+ * associated topic that is used for internal communication between the different hosts
* serving the controller.
*/
public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerFeatureApi, DroolsControllerFeatureApi {
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
index 5e358e61..20f51023 100644
--- a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
@@ -46,10 +46,10 @@ public interface PoolingManager {
String getHost();
/**
- * Gets the name of the internal DMaaP topic used by this manager to communicate with
+ * Gets the name of the internal topic used by this manager to communicate with
* its other hosts.
*
- * @return the name of the internal DMaaP topic
+ * @return the name of the internal topic
*/
String getTopic();
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index 7c0436eb..fec6ba81 100644
--- a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -91,13 +91,13 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
private final Serializer serializer;
/**
- * Internal DMaaP topic used by this controller.
+ * Internal topic used by this controller.
*/
@Getter
private final String topic;
/**
- * Manager for the internal DMaaP topic.
+ * Manager for the internal topic.
*/
private final TopicMessageManager topicMessageManager;
@@ -157,7 +157,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
throw new PoolingFeatureRtException(e);
} catch (PoolingFeatureException e) {
- logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
+ logger.error("failed to attach internal topic to controller {}", controller.getName());
throw new PoolingFeatureRtException(e);
}
}
@@ -598,9 +598,9 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
}
/**
- * Creates a DMaaP manager.
+ * Creates a topic manager.
*
- * @param topic name of the internal DMaaP topic
+ * @param topic name of the internal topic
* @return a new topic messages manager
* @throws PoolingFeatureException if an error occurs
*/
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java
index ac2b31a8..1ad5d513 100644
--- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java
@@ -64,11 +64,11 @@ import org.slf4j.LoggerFactory;
/**
* End-to-end tests of the pooling feature. Launches one or more "hosts", each one having its own
- * feature object. Uses real feature objects, as well as real DMaaP sources and sinks. However, the
+ * feature object. Uses real feature objects, as well as real sources and sinks. However, the
* following are not: <dl> <dt>PolicyEngine, PolicyController, DroolsController</dt> <dd>mocked</dd>
* </dl>
*
- * <p>The following fields must be set before executing this: <ul> <li>UEB_SERVERS</li>
+ * <p>The following fields must be set before executing this: <ul> <li>SERVER</li>
* <li>INTERNAL_TOPIC</li> <li>EXTERNAL_TOPIC</li> </ul>
*/
public class EndToEndFeatureTest {
@@ -252,18 +252,18 @@ public class EndToEndFeatureTest {
private static Properties makeSourceProperties(String topic) {
Properties props = new Properties();
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS, topic);
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
if (EXTERNAL_TOPIC.equals(topic)) {
// consumer group is a constant
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP);
// consumer instance is generated by the BusConsumer code
@@ -520,8 +520,8 @@ public class EndToEndFeatureTest {
}
/**
- * Starts threads for the host so that it begins consuming from both the external "DMaaP"
- * topic and its own internal "DMaaP" topic.
+ * Starts threads for the host so that it begins consuming from both the external
+ * topic and its own internal topic.
*/
public void start() {
feature.beforeStart(engine);
@@ -775,7 +775,7 @@ public class EndToEndFeatureTest {
}
/**
- * DMaaP Manager with overrides.
+ * TopicManager with overrides.
*/
private static class TopicMessageManagerImpl extends TopicMessageManager {
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
index c507b68a..f1b48b3e 100644
--- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
@@ -68,7 +68,6 @@ import org.slf4j.LoggerFactory;
* End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
* its own feature object. Uses real feature objects. However, the following are not:
* <dl>
- * <dt>DMaaP sources and sinks</dt>
* <dd>simulated using queues. There is one queue for the external topic, and one queue
* for each host's internal topic. Messages published to the "admin" channel are simply
* sent to all of the hosts' internal topic queues</dd>
@@ -430,13 +429,13 @@ class FeatureTest {
private final AtomicBoolean sawMsg = new AtomicBoolean(false);
/**
- * This host's internal "DMaaP" topic.
+ * This host's internal topic.
*/
private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
/**
- * Queue for the external "DMaaP" topic.
+ * Queue for the external topic.
*/
@Getter
private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
@@ -486,7 +485,7 @@ class FeatureTest {
/**
* Starts threads for the host so that it begins consuming from both the external
- * "DMaaP" topic and its own internal "DMaaP" topic.
+ * topic and its own internal topic.
*/
public void start() {
@@ -987,7 +986,7 @@ class FeatureTest {
}
/**
- * DMaaP Manager with overrides.
+ * TopicManager with overrides.
*/
private static class TopicMessageManagerImpl extends TopicMessageManager {