aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-messages/src
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-messages/src')
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java36
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java2
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java32
-rw-r--r--feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java27
4 files changed, 49 insertions, 48 deletions
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java
index 31ad207c..ac2b31a8 100644
--- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java
@@ -76,9 +76,9 @@ public class EndToEndFeatureTest {
private static final Logger logger = LoggerFactory.getLogger(EndToEndFeatureTest.class);
/**
- * UEB servers for both internal & external topics.
+ * KAFKA servers for both internal & external topics.
*/
- private static final String UEB_SERVERS = "ueb-server";
+ private static final String SERVER = "localhost:9092";
/**
* Name of the topic used for inter-host communication.
@@ -101,7 +101,7 @@ public class EndToEndFeatureTest {
private static final String CONTROLLER1 = "controller.one";
/**
- * Maximum number of items to fetch from DMaaP in a single poll.
+ * Maximum number of items to fetch from Kafka in a single poll.
*/
private static final String FETCH_LIMIT = "5";
@@ -124,12 +124,12 @@ public class EndToEndFeatureTest {
private static final ThreadLocal<Host> currentHost = new ThreadLocal<Host>();
/**
- * Sink for external DMaaP topic.
+ * Sink for external Kafka topic.
*/
private static TopicSink externalSink;
/**
- * Sink for internal DMaaP topic.
+ * Sink for internal Kafka topic.
*/
private static TopicSink internalSink;
@@ -237,13 +237,13 @@ public class EndToEndFeatureTest {
private static Properties makeSinkProperties(String topic) {
Properties props = new Properties();
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS, topic);
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS, topic);
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
- props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+ props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
return props;
@@ -255,7 +255,7 @@ public class EndToEndFeatureTest {
props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
+ + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
@@ -348,14 +348,14 @@ public class EndToEndFeatureTest {
* Starts the hosts.
*/
public void startHosts() {
- hosts.forEach(host -> host.start());
+ hosts.forEach(Host::start);
}
/**
* Stops the hosts.
*/
public void stopHosts() {
- hosts.forEach(host -> host.stop());
+ hosts.forEach(Host::stop);
}
/**
@@ -489,10 +489,10 @@ public class EndToEndFeatureTest {
when(controller.getName()).thenReturn(CONTROLLER1);
when(controller.getDrools()).thenReturn(drools);
- externalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(EXTERNAL_TOPIC))
- .get(0);
- internalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(INTERNAL_TOPIC))
- .get(0);
+ externalSource = TopicEndpointManager.getManager()
+ .addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0);
+ internalSource = TopicEndpointManager.getManager()
+ .addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0);
// stop consuming events if the controller stops
when(controller.stop()).thenAnswer(args -> {
@@ -646,7 +646,7 @@ public class EndToEndFeatureTest {
if (!host.beforeInsert(fact)) {
// feature did not handle it so we handle it here
- host.afterInsert(fact, result);
+ host.afterInsert(fact, true);
host.sawMessage();
context.addEvent();
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
index e9bd3cb5..c507b68a 100644
--- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
@@ -794,7 +794,7 @@ class FeatureTest {
if (msg == null) {
return;
}
- listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
+ listener.onTopicEvent(CommInfrastructure.KAFKA, topic, msg);
}
}
}
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
index 1b05e021..1eb9e361 100644
--- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
@@ -298,7 +298,7 @@ class PoolingFeatureTest {
@Test
void testBeforeOffer() {
- assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ assertFalse(pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1));
verify(mgr1).beforeOffer(TOPIC1, EVENT1);
// ensure that the args were captured
@@ -307,7 +307,7 @@ class PoolingFeatureTest {
// ensure it's still in the map by re-invoking
- assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
+ assertFalse(pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC2, EVENT2));
verify(mgr1).beforeOffer(TOPIC2, EVENT2);
// ensure that the new args were captured
@@ -315,12 +315,12 @@ class PoolingFeatureTest {
verify(mgr1).beforeInsert(TOPIC2, OBJECT2);
- assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.KAFKA, TOPIC1, EVENT1));
}
@Test
void testBeforeOffer_NotFound() {
- assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.KAFKA, TOPIC1, EVENT1));
}
@Test
@@ -329,28 +329,28 @@ class PoolingFeatureTest {
// manager will return true
when(mgr1.beforeOffer(any(), any())).thenReturn(true);
- assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ assertTrue(pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1));
verify(mgr1).beforeOffer(TOPIC1, EVENT1);
// ensure it's still in the map by re-invoking
- assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
+ assertTrue(pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC2, EVENT2));
verify(mgr1).beforeOffer(TOPIC2, EVENT2);
- assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
+ assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.KAFKA, TOPIC1, EVENT1));
}
@Test
void testBeforeInsert() {
- pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+ pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
verify(mgr1).beforeInsert(TOPIC1, OBJECT1);
// ensure it's still in the map by re-invoking
- pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2);
+ pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC2, EVENT2);
assertFalse(pool.beforeInsert(drools1, OBJECT2));
verify(mgr1).beforeInsert(TOPIC2, OBJECT2);
- pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC2, EVENT2);
+ pool.beforeOffer(controllerDisabled, CommInfrastructure.KAFKA, TOPIC2, EVENT2);
assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
}
@@ -375,7 +375,7 @@ class PoolingFeatureTest {
}
};
- pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+ pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
verify(mgr1, never()).beforeInsert(any(), any());
}
@@ -390,7 +390,7 @@ class PoolingFeatureTest {
}
};
- pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+ pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
verify(mgr1, never()).beforeInsert(any(), any());
}
@@ -406,7 +406,7 @@ class PoolingFeatureTest {
}
};
- pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+ pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1);
assertFalse(pool.beforeInsert(drools1, OBJECT1));
verify(mgr1, never()).beforeInsert(any(), any());
}
@@ -414,17 +414,17 @@ class PoolingFeatureTest {
@Test
void testBeforeInsert_NotFound() {
- pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC2, EVENT2);
+ pool.beforeOffer(controllerDisabled, CommInfrastructure.KAFKA, TOPIC2, EVENT2);
assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
}
@Test
void testAfterOffer() {
// this will create OfferArgs
- pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
+ pool.beforeOffer(controller1, CommInfrastructure.KAFKA, TOPIC1, EVENT1);
// this should clear them
- assertFalse(pool.afterOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2, true));
+ assertFalse(pool.afterOffer(controller1, CommInfrastructure.KAFKA, TOPIC2, EVENT2, true));
assertFalse(pool.beforeInsert(drools1, OBJECT1));
verify(mgr1, never()).beforeInsert(any(), any());
diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index ac60ae27..98300683 100644
--- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.LinkedList;
+import java.util.Objects;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
@@ -504,7 +505,7 @@ class PoolingManagerImplTest {
}
@Test
- void testOnTopicEvent() throws Exception {
+ void testOnTopicEvent() {
startMgr();
StartState st = (StartState) mgr.getCurrent();
@@ -517,16 +518,16 @@ class PoolingManagerImplTest {
String msg = ser.encodeMsg(hb);
- mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+ mgr.onTopicEvent(CommInfrastructure.KAFKA, MY_TOPIC, msg);
assertInstanceOf(QueryState.class, mgr.getCurrent());
}
@Test
- void testOnTopicEvent_NullEvent() throws Exception {
+ void testOnTopicEvent_NullEvent() {
startMgr();
- assertThatCode(() -> mgr.onTopicEvent(CommInfrastructure.UEB, TOPIC2, null)).doesNotThrowAnyException();
+ assertThatCode(() -> mgr.onTopicEvent(CommInfrastructure.KAFKA, TOPIC2, null)).doesNotThrowAnyException();
}
@Test
@@ -563,12 +564,12 @@ class PoolingManagerImplTest {
@Test
void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
- validateHandleReqId(null);
+ validateHandleReqId();
}
@Test
void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
- validateHandleReqId("");
+ validateHandleReqId();
}
@Test
@@ -717,7 +718,7 @@ class PoolingManagerImplTest {
String msg = ser.encodeMsg(hb);
- mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+ mgr.onTopicEvent(CommInfrastructure.KAFKA, MY_TOPIC, msg);
assertInstanceOf(QueryState.class, mgr.getCurrent());
}
@@ -726,7 +727,7 @@ class PoolingManagerImplTest {
void testHandleInternal_IoEx() throws Exception {
startMgr();
- mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, "invalid message");
+ mgr.onTopicEvent(CommInfrastructure.KAFKA, MY_TOPIC, "invalid message");
assertInstanceOf(StartState.class, mgr.getCurrent());
}
@@ -746,7 +747,7 @@ class PoolingManagerImplTest {
String msg = ser.encodeMsg(hb);
- mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+ mgr.onTopicEvent(CommInfrastructure.KAFKA, MY_TOPIC, msg);
assertInstanceOf(StartState.class, mgr.getCurrent());
}
@@ -859,7 +860,7 @@ class PoolingManagerImplTest {
String msg = ser.encodeMsg(hb);
- mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
+ mgr.onTopicEvent(CommInfrastructure.KAFKA, MY_TOPIC, msg);
assertInstanceOf(QueryState.class, mgr.getCurrent());
@@ -870,7 +871,7 @@ class PoolingManagerImplTest {
assertEquals(1, latch.getCount());
}
- private void validateHandleReqId(String requestId) throws PoolingFeatureException {
+ private void validateHandleReqId() {
startMgr();
assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
@@ -887,7 +888,7 @@ class PoolingManagerImplTest {
verify(topicMessageManager, times(START_PUB)).publish(any());
}
- private void validateUnhandled() throws PoolingFeatureException {
+ private void validateUnhandled() {
startMgr();
assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
}
@@ -985,7 +986,7 @@ class PoolingManagerImplTest {
@Override
protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
- if (drools2 == drools && TOPIC2.equals(topic2) && event == THE_EVENT) {
+ if (drools2 == drools && TOPIC2.equals(topic2) && Objects.equals(event, THE_EVENT)) {
return DECODED_EVENT;
} else {
return null;