diff options
Diffstat (limited to 'feature-pooling-messages/src')
4 files changed, 49 insertions, 48 deletions
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java index 31ad207c..ac2b31a8 100644 --- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java +++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java @@ -76,9 +76,9 @@ public class EndToEndFeatureTest { private static final Logger logger = LoggerFactory.getLogger(EndToEndFeatureTest.class); /** - * UEB servers for both internal & external topics. + * KAFKA servers for both internal & external topics. */ - private static final String UEB_SERVERS = "ueb-server"; + private static final String SERVER = "localhost:9092"; /** * Name of the topic used for inter-host communication. @@ -101,7 +101,7 @@ public class EndToEndFeatureTest { private static final String CONTROLLER1 = "controller.one"; /** - * Maximum number of items to fetch from DMaaP in a single poll. + * Maximum number of items to fetch from Kafka in a single poll. */ private static final String FETCH_LIMIT = "5"; @@ -124,12 +124,12 @@ public class EndToEndFeatureTest { private static final ThreadLocal<Host> currentHost = new ThreadLocal<Host>(); /** - * Sink for external DMaaP topic. + * Sink for external Kafka topic. */ private static TopicSink externalSink; /** - * Sink for internal DMaaP topic. + * Sink for internal Kafka topic. */ private static TopicSink internalSink; @@ -237,13 +237,13 @@ public class EndToEndFeatureTest { private static Properties makeSinkProperties(String topic) { Properties props = new Properties(); - props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS, topic); + props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS, topic); - props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS); - props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); + props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0"); - props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false"); return props; @@ -255,7 +255,7 @@ public class EndToEndFeatureTest { props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic); props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS); + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT); props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic @@ -348,14 +348,14 @@ public class EndToEndFeatureTest { * Starts the hosts. */ public void startHosts() { - hosts.forEach(host -> host.start()); + hosts.forEach(Host::start); } /** * Stops the hosts. */ public void stopHosts() { - hosts.forEach(host -> host.stop()); + hosts.forEach(Host::stop); } /** @@ -489,10 +489,10 @@ public class EndToEndFeatureTest { when(controller.getName()).thenReturn(CONTROLLER1); when(controller.getDrools()).thenReturn(drools); - externalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)) - .get(0); - internalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(INTERNAL_TOPIC)) - .get(0); + externalSource = TopicEndpointManager.getManager() + .addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0); + internalSource = TopicEndpointManager.getManager() + .addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0); // stop consuming events if the controller stops when(controller.stop()).thenAnswer(args -> { @@ -646,7 +646,7 @@ public class EndToEndFeatureTest { if (!host.beforeInsert(fact)) { // feature did not handle it so we handle it here - host.afterInsert(fact, result); + host.afterInsert(fact, true); host.sawMessage(); context.addEvent(); diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java index e9bd3cb5..c507b68a 100644 --- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java +++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java @@ -794,7 +794,7 @@ class FeatureTest { if (msg == null) { return; } - listener.onTopicEvent(CommInfrastructure.UEB, topic, msg); + listener.onTopicEvent(CommInfrastructure.KAFKA, topic, msg); } } } diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java index 1b05e021..1eb9e361 100644 --- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java +++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java @@ -298,7 +298,7 @@ class PoolingFeatureTest { @Test void testBeforeOffer() { - assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1)); + assertFalse(pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1)); verify(mgr1).beforeOffer(TOPIC1, EVENT1); // ensure that the args were captured @@ -307,7 +307,7 @@ class PoolingFeatureTest { // ensure it's still in the map by re-invoking - assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2)); + assertFalse(pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC2, EVENT2)); verify(mgr1).beforeOffer(TOPIC2, EVENT2); // ensure that the new args were captured @@ -315,12 +315,12 @@ class PoolingFeatureTest { verify(mgr1).beforeInsert(TOPIC2, OBJECT2); - assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1)); + assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.KAFKA, TOPIC1, EVENT1)); } @Test void testBeforeOffer_NotFound() { - assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1)); + assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.KAFKA, TOPIC1, EVENT1)); } @Test @@ -329,28 +329,28 @@ class PoolingFeatureTest { // manager will return true when(mgr1.beforeOffer(any(), any())).thenReturn(true); - assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1)); + assertTrue(pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1)); verify(mgr1).beforeOffer(TOPIC1, EVENT1); // ensure it's still in the map by re-invoking - assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2)); + assertTrue(pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC2, EVENT2)); verify(mgr1).beforeOffer(TOPIC2, EVENT2); - assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1)); + assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.KAFKA, TOPIC1, EVENT1)); } @Test void testBeforeInsert() { - pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1); + pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1); assertFalse(pool.beforeInsert(drools1, OBJECT1)); verify(mgr1).beforeInsert(TOPIC1, OBJECT1); // ensure it's still in the map by re-invoking - pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2); + pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC2, EVENT2); assertFalse(pool.beforeInsert(drools1, OBJECT2)); verify(mgr1).beforeInsert(TOPIC2, OBJECT2); - pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC2, EVENT2); + pool.beforeOffer(controllerDisabled, CommInfrastructure.KAFKA, TOPIC2, EVENT2); assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1)); } @@ -375,7 +375,7 @@ class PoolingFeatureTest { } }; - pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1); + pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1); assertFalse(pool.beforeInsert(drools1, OBJECT1)); verify(mgr1, never()).beforeInsert(any(), any()); } @@ -390,7 +390,7 @@ class PoolingFeatureTest { } }; - pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1); + pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1); assertFalse(pool.beforeInsert(drools1, OBJECT1)); verify(mgr1, never()).beforeInsert(any(), any()); } @@ -406,7 +406,7 @@ class PoolingFeatureTest { } }; - pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1); + pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1); assertFalse(pool.beforeInsert(drools1, OBJECT1)); verify(mgr1, never()).beforeInsert(any(), any()); } @@ -414,17 +414,17 @@ class PoolingFeatureTest { @Test void testBeforeInsert_NotFound() { - pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC2, EVENT2); + pool.beforeOffer(controllerDisabled, CommInfrastructure.KAFKA, TOPIC2, EVENT2); assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1)); } @Test void testAfterOffer() { // this will create OfferArgs - pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1); + pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1); // this should clear them - assertFalse(pool.afterOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2, true)); + assertFalse(pool.afterOffer(controller1, CommInfrastructure.KAFKA, TOPIC2, EVENT2, true)); assertFalse(pool.beforeInsert(drools1, OBJECT1)); verify(mgr1, never()).beforeInsert(any(), any()); diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java index ac60ae27..98300683 100644 --- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java +++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java @@ -37,6 +37,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.LinkedList; +import java.util.Objects; import java.util.Properties; import java.util.Queue; import java.util.concurrent.CountDownLatch; @@ -504,7 +505,7 @@ class PoolingManagerImplTest { } @Test - void testOnTopicEvent() throws Exception { + void testOnTopicEvent() { startMgr(); StartState st = (StartState) mgr.getCurrent(); @@ -517,16 +518,16 @@ class PoolingManagerImplTest { String msg = ser.encodeMsg(hb); - mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg); + mgr.onTopicEvent(CommInfrastructure.KAFKA, MY_TOPIC, msg); assertInstanceOf(QueryState.class, mgr.getCurrent()); } @Test - void testOnTopicEvent_NullEvent() throws Exception { + void testOnTopicEvent_NullEvent() { startMgr(); - assertThatCode(() -> mgr.onTopicEvent(CommInfrastructure.UEB, TOPIC2, null)).doesNotThrowAnyException(); + assertThatCode(() -> mgr.onTopicEvent(CommInfrastructure.KAFKA, TOPIC2, null)).doesNotThrowAnyException(); } @Test @@ -563,12 +564,12 @@ class PoolingManagerImplTest { @Test void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception { - validateHandleReqId(null); + validateHandleReqId(); } @Test void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception { - validateHandleReqId(""); + validateHandleReqId(); } @Test @@ -717,7 +718,7 @@ class PoolingManagerImplTest { String msg = ser.encodeMsg(hb); - mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg); + mgr.onTopicEvent(CommInfrastructure.KAFKA, MY_TOPIC, msg); assertInstanceOf(QueryState.class, mgr.getCurrent()); } @@ -726,7 +727,7 @@ class PoolingManagerImplTest { void testHandleInternal_IoEx() throws Exception { startMgr(); - mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, "invalid message"); + mgr.onTopicEvent(CommInfrastructure.KAFKA, MY_TOPIC, "invalid message"); assertInstanceOf(StartState.class, mgr.getCurrent()); } @@ -746,7 +747,7 @@ class PoolingManagerImplTest { String msg = ser.encodeMsg(hb); - mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg); + mgr.onTopicEvent(CommInfrastructure.KAFKA, MY_TOPIC, msg); assertInstanceOf(StartState.class, mgr.getCurrent()); } @@ -859,7 +860,7 @@ class PoolingManagerImplTest { String msg = ser.encodeMsg(hb); - mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg); + mgr.onTopicEvent(CommInfrastructure.KAFKA, MY_TOPIC, msg); assertInstanceOf(QueryState.class, mgr.getCurrent()); @@ -870,7 +871,7 @@ class PoolingManagerImplTest { assertEquals(1, latch.getCount()); } - private void validateHandleReqId(String requestId) throws PoolingFeatureException { + private void validateHandleReqId() { startMgr(); assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT)); @@ -887,7 +888,7 @@ class PoolingManagerImplTest { verify(topicMessageManager, times(START_PUB)).publish(any()); } - private void validateUnhandled() throws PoolingFeatureException { + private void validateUnhandled() { startMgr(); assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT)); } @@ -985,7 +986,7 @@ class PoolingManagerImplTest { @Override protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) { - if (drools2 == drools && TOPIC2.equals(topic2) && event == THE_EVENT) { + if (drools2 == drools && TOPIC2.equals(topic2) && Objects.equals(event, THE_EVENT)) { return DECODED_EVENT; } else { return null; |