diff options
Diffstat (limited to 'feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java')
-rw-r--r-- | feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java index 31ad207c..ac2b31a8 100644 --- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java +++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java @@ -76,9 +76,9 @@ public class EndToEndFeatureTest { private static final Logger logger = LoggerFactory.getLogger(EndToEndFeatureTest.class); /** - * UEB servers for both internal & external topics. + * KAFKA servers for both internal & external topics. */ - private static final String UEB_SERVERS = "ueb-server"; + private static final String SERVER = "localhost:9092"; /** * Name of the topic used for inter-host communication. @@ -101,7 +101,7 @@ public class EndToEndFeatureTest { private static final String CONTROLLER1 = "controller.one"; /** - * Maximum number of items to fetch from DMaaP in a single poll. + * Maximum number of items to fetch from Kafka in a single poll. */ private static final String FETCH_LIMIT = "5"; @@ -124,12 +124,12 @@ public class EndToEndFeatureTest { private static final ThreadLocal<Host> currentHost = new ThreadLocal<Host>(); /** - * Sink for external DMaaP topic. + * Sink for external Kafka topic. */ private static TopicSink externalSink; /** - * Sink for internal DMaaP topic. + * Sink for internal Kafka topic. */ private static TopicSink internalSink; @@ -237,13 +237,13 @@ public class EndToEndFeatureTest { private static Properties makeSinkProperties(String topic) { Properties props = new Properties(); - props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS, topic); + props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS, topic); - props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS); - props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); + props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0"); - props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false"); return props; @@ -255,7 +255,7 @@ public class EndToEndFeatureTest { props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic); props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS); + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT); props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic @@ -348,14 +348,14 @@ public class EndToEndFeatureTest { * Starts the hosts. */ public void startHosts() { - hosts.forEach(host -> host.start()); + hosts.forEach(Host::start); } /** * Stops the hosts. */ public void stopHosts() { - hosts.forEach(host -> host.stop()); + hosts.forEach(Host::stop); } /** @@ -489,10 +489,10 @@ public class EndToEndFeatureTest { when(controller.getName()).thenReturn(CONTROLLER1); when(controller.getDrools()).thenReturn(drools); - externalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)) - .get(0); - internalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(INTERNAL_TOPIC)) - .get(0); + externalSource = TopicEndpointManager.getManager() + .addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0); + internalSource = TopicEndpointManager.getManager() + .addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0); // stop consuming events if the controller stops when(controller.stop()).thenAnswer(args -> { @@ -646,7 +646,7 @@ public class EndToEndFeatureTest { if (!host.beforeInsert(fact)) { // feature did not handle it so we handle it here - host.afterInsert(fact, result); + host.afterInsert(fact, true); host.sawMessage(); context.addEvent(); |