aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event')
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java180
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicFactoryTestBase.java20
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactoryTestBase.java133
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicPropertyBuilder.java154
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkTest.java34
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactoryTest.java207
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceTest.java34
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java (renamed from policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactoryTestBase.java)6
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java75
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java (renamed from policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactoryTest.java)55
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java (renamed from policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkTest.java)6
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java (renamed from policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactoryTest.java)70
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java (renamed from policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceTest.java)8
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpointTest.java3
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactoryTest.java6
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicFactoryTestBase.java10
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java46
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicPropertyBuilder.java117
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactoryTest.java210
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java346
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java218
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java5
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java41
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java17
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.java81
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java (renamed from policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java)22
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java52
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.java98
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java (renamed from policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java)17
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java19
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java453
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicClientExceptionTest.java (renamed from policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientExceptionTest.java)5
32 files changed, 986 insertions, 1762 deletions
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java
index a661b063..b6777db7 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.java
@@ -2,7 +2,8 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,16 +32,11 @@ import static org.junit.Assert.assertTrue;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
-
import org.junit.After;
import org.junit.Test;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicFactories;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicPropertyBuilder;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicPropertyBuilder;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories;
-import org.onap.policy.common.endpoints.event.comm.bus.UebTopicPropertyBuilder;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
@@ -51,14 +47,8 @@ public class TopicEndpointProxyTest {
private static final String NOOP_SOURCE_TOPIC = "noop-source";
private static final String NOOP_SINK_TOPIC = "noop-sink";
- private static final String UEB_SOURCE_TOPIC = "ueb-source";
- private static final String UEB_SINK_TOPIC = "ueb-sink";
-
- private static final String DMAAP_SOURCE_TOPIC = "dmaap-source";
- private static final String DMAAP_SINK_TOPIC = "dmaap-sink";
-
- private Properties configuration = new Properties();
- private TopicParameterGroup group = new TopicParameterGroup();
+ private final Properties configuration = new Properties();
+ private final TopicParameterGroup group = new TopicParameterGroup();
/**
* Constructor.
@@ -79,30 +69,6 @@ public class TopicEndpointProxyTest {
configuration.putAll(noopSinkBuilder.build());
group.getTopicSinks().add(noopSinkBuilder.getParams());
- UebTopicPropertyBuilder uebSourceBuilder =
- new UebTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS)
- .makeTopic(UEB_SOURCE_TOPIC);
- configuration.putAll(uebSourceBuilder.build());
- group.getTopicSources().add(uebSourceBuilder.getParams());
-
- UebTopicPropertyBuilder uebSinkBuilder =
- new UebTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS)
- .makeTopic(UEB_SINK_TOPIC);
- configuration.putAll(uebSinkBuilder.build());
- group.getTopicSinks().add(uebSinkBuilder.getParams());
-
- DmaapTopicPropertyBuilder dmaapSourceBuilder =
- new DmaapTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS)
- .makeTopic(DMAAP_SOURCE_TOPIC);
- configuration.putAll(dmaapSourceBuilder.build());
- group.getTopicSources().add(dmaapSourceBuilder.getParams());
-
- DmaapTopicPropertyBuilder dmaapSinkBuilder =
- new DmaapTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS)
- .makeTopic(DMAAP_SINK_TOPIC);
- configuration.putAll(dmaapSinkBuilder.build());
- group.getTopicSinks().add(dmaapSinkBuilder.getParams());
-
TopicParameters invalidCommInfraParams =
new NoopTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS)
.makeTopic(NOOP_SOURCE_TOPIC).getParams();
@@ -116,21 +82,19 @@ public class TopicEndpointProxyTest {
}
private <T extends Topic> boolean allSources(List<T> topics) {
- return exists(topics, NOOP_SOURCE_TOPIC) && exists(topics, UEB_SOURCE_TOPIC)
- && exists(topics, DMAAP_SOURCE_TOPIC);
+ return exists(topics, NOOP_SOURCE_TOPIC);
}
private <T extends Topic> boolean allSinks(List<T> topics) {
- return exists(topics, NOOP_SINK_TOPIC) && exists(topics, UEB_SINK_TOPIC) && exists(topics, DMAAP_SINK_TOPIC);
+ return exists(topics, NOOP_SINK_TOPIC);
}
private <T extends Topic> boolean anySource(List<T> topics) {
- return exists(topics, NOOP_SOURCE_TOPIC) || exists(topics, UEB_SOURCE_TOPIC)
- || exists(topics, DMAAP_SOURCE_TOPIC);
+ return exists(topics, NOOP_SOURCE_TOPIC);
}
private <T extends Topic> boolean anySink(List<T> topics) {
- return exists(topics, NOOP_SINK_TOPIC) || exists(topics, UEB_SINK_TOPIC) || exists(topics, DMAAP_SINK_TOPIC);
+ return exists(topics, NOOP_SINK_TOPIC);
}
/**
@@ -140,12 +104,6 @@ public class TopicEndpointProxyTest {
public void tearDown() {
NoopTopicFactories.getSinkFactory().destroy();
NoopTopicFactories.getSourceFactory().destroy();
-
- UebTopicFactories.getSinkFactory().destroy();
- UebTopicFactories.getSourceFactory().destroy();
-
- DmaapTopicFactories.getSinkFactory().destroy();
- DmaapTopicFactories.getSourceFactory().destroy();
}
@Test
@@ -164,7 +122,7 @@ public class TopicEndpointProxyTest {
TopicEndpoint manager = new TopicEndpointProxy();
List<TopicSource> sources = manager.addTopicSources(group.getTopicSources());
- assertSame(3, sources.size());
+ assertSame(1, sources.size());
assertTrue(allSources(sources));
assertFalse(anySink(sources));
@@ -175,7 +133,7 @@ public class TopicEndpointProxyTest {
TopicEndpoint manager = new TopicEndpointProxy();
List<TopicSource> sources = manager.addTopicSources(configuration);
- assertSame(3, sources.size());
+ assertSame(1, sources.size());
assertTrue(allSources(sources));
assertFalse(anySink(sources));
@@ -186,7 +144,7 @@ public class TopicEndpointProxyTest {
TopicEndpoint manager = new TopicEndpointProxy();
List<TopicSink> sinks = manager.addTopicSinks(group.getTopicSinks());
- assertSame(3, sinks.size());
+ assertSame(1, sinks.size());
assertFalse(anySource(sinks));
assertTrue(allSinks(sinks));
@@ -197,7 +155,7 @@ public class TopicEndpointProxyTest {
TopicEndpoint manager = new TopicEndpointProxy();
List<TopicSink> sinks = manager.addTopicSinks(configuration);
- assertSame(3, sinks.size());
+ assertSame(1, sinks.size());
assertFalse(anySource(sinks));
assertTrue(allSinks(sinks));
@@ -208,7 +166,7 @@ public class TopicEndpointProxyTest {
TopicEndpoint manager = new TopicEndpointProxy();
List<Topic> topics = manager.addTopics(configuration);
- assertSame(6, topics.size());
+ assertSame(2, topics.size());
assertTrue(allSources(topics));
assertTrue(allSinks(topics));
@@ -219,7 +177,7 @@ public class TopicEndpointProxyTest {
TopicEndpoint manager = new TopicEndpointProxy();
List<Topic> topics = manager.addTopics(group);
- assertSame(6, topics.size());
+ assertSame(2, topics.size());
assertTrue(allSources(topics));
assertTrue(allSinks(topics));
@@ -258,7 +216,7 @@ public class TopicEndpointProxyTest {
manager.addTopicSinks(configuration);
List<TopicSource> sources = manager.getTopicSources();
- assertSame(3, sources.size());
+ assertSame(1, sources.size());
assertTrue(allSources(sources));
assertFalse(anySink(sources));
@@ -272,29 +230,13 @@ public class TopicEndpointProxyTest {
manager.addTopicSinks(configuration);
List<TopicSink> sinks = manager.getTopicSinks();
- assertSame(3, sinks.size());
+ assertSame(1, sinks.size());
assertFalse(anySource(sinks));
assertTrue(allSinks(sinks));
}
@Test
- public void testGetUebTopicSources() {
- TopicEndpoint manager = new TopicEndpointProxy();
-
- manager.addTopicSources(configuration);
- assertSame(1, manager.getUebTopicSources().size());
- }
-
- @Test
- public void testGetDmaapTopicSources() {
- TopicEndpoint manager = new TopicEndpointProxy();
-
- manager.addTopicSources(configuration);
- assertSame(1, manager.getDmaapTopicSources().size());
- }
-
- @Test
public void testGetNoopTopicSources() {
TopicEndpoint manager = new TopicEndpointProxy();
@@ -303,22 +245,6 @@ public class TopicEndpointProxyTest {
}
@Test
- public void testGetUebTopicSinks() {
- TopicEndpoint manager = new TopicEndpointProxy();
-
- manager.addTopicSinks(configuration);
- assertSame(1, manager.getUebTopicSinks().size());
- }
-
- @Test
- public void testGetDmaapTopicSinks() {
- TopicEndpoint manager = new TopicEndpointProxy();
-
- manager.addTopicSinks(configuration);
- assertSame(1, manager.getDmaapTopicSinks().size());
- }
-
- @Test
public void testGetNoopTopicSinks() {
TopicEndpoint manager = new TopicEndpointProxy();
@@ -360,15 +286,9 @@ public class TopicEndpointProxyTest {
manager.addTopicSources(configuration);
assertSame(NOOP_SOURCE_TOPIC, manager.getTopicSource(CommInfrastructure.NOOP, NOOP_SOURCE_TOPIC).getTopic());
- assertSame(UEB_SOURCE_TOPIC, manager.getTopicSource(CommInfrastructure.UEB, UEB_SOURCE_TOPIC).getTopic());
- assertSame(DMAAP_SOURCE_TOPIC, manager.getTopicSource(CommInfrastructure.DMAAP, DMAAP_SOURCE_TOPIC).getTopic());
assertThatIllegalStateException()
.isThrownBy(() -> manager.getTopicSource(CommInfrastructure.NOOP, NOOP_SINK_TOPIC));
- assertThatIllegalStateException()
- .isThrownBy(() -> manager.getTopicSource(CommInfrastructure.UEB, UEB_SINK_TOPIC));
- assertThatIllegalStateException()
- .isThrownBy(() -> manager.getTopicSource(CommInfrastructure.DMAAP, DMAAP_SINK_TOPIC));
}
@Test
@@ -377,71 +297,9 @@ public class TopicEndpointProxyTest {
manager.addTopicSinks(configuration);
assertSame(NOOP_SINK_TOPIC, manager.getTopicSink(CommInfrastructure.NOOP, NOOP_SINK_TOPIC).getTopic());
- assertSame(UEB_SINK_TOPIC, manager.getTopicSink(CommInfrastructure.UEB, UEB_SINK_TOPIC).getTopic());
- assertSame(DMAAP_SINK_TOPIC, manager.getTopicSink(CommInfrastructure.DMAAP, DMAAP_SINK_TOPIC).getTopic());
assertThatIllegalStateException()
.isThrownBy(() -> manager.getTopicSink(CommInfrastructure.NOOP, NOOP_SOURCE_TOPIC));
- assertThatIllegalStateException()
- .isThrownBy(() -> manager.getTopicSink(CommInfrastructure.UEB, UEB_SOURCE_TOPIC));
- assertThatIllegalStateException()
- .isThrownBy(() -> manager.getTopicSink(CommInfrastructure.DMAAP, DMAAP_SOURCE_TOPIC));
- }
-
- @Test
- public void testGetUebTopicSource() {
- TopicEndpoint manager = new TopicEndpointProxy();
- manager.addTopicSources(configuration);
-
- assertSame(UEB_SOURCE_TOPIC, manager.getUebTopicSource(UEB_SOURCE_TOPIC).getTopic());
-
- assertThatIllegalStateException().isThrownBy(() -> manager.getUebTopicSource(NOOP_SOURCE_TOPIC));
- assertThatIllegalStateException().isThrownBy(() -> manager.getUebTopicSource(DMAAP_SOURCE_TOPIC));
-
- assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSource(null));
- assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSource(""));
- }
-
- @Test
- public void testGetUebTopicSink() {
- TopicEndpoint manager = new TopicEndpointProxy();
- manager.addTopicSinks(configuration);
-
- assertSame(UEB_SINK_TOPIC, manager.getUebTopicSink(UEB_SINK_TOPIC).getTopic());
-
- assertThatIllegalStateException().isThrownBy(() -> manager.getUebTopicSink(NOOP_SINK_TOPIC));
- assertThatIllegalStateException().isThrownBy(() -> manager.getUebTopicSink(DMAAP_SINK_TOPIC));
-
- assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSink(null));
- assertThatIllegalArgumentException().isThrownBy(() -> manager.getUebTopicSink(""));
- }
-
- @Test
- public void testGetDmaapTopicSource() {
- TopicEndpoint manager = new TopicEndpointProxy();
- manager.addTopicSources(configuration);
-
- assertSame(DMAAP_SOURCE_TOPIC, manager.getDmaapTopicSource(DMAAP_SOURCE_TOPIC).getTopic());
-
- assertThatIllegalStateException().isThrownBy(() -> manager.getDmaapTopicSource(NOOP_SOURCE_TOPIC));
- assertThatIllegalStateException().isThrownBy(() -> manager.getDmaapTopicSource(UEB_SOURCE_TOPIC));
-
- assertThatIllegalArgumentException().isThrownBy(() -> manager.getDmaapTopicSource(null));
- assertThatIllegalArgumentException().isThrownBy(() -> manager.getDmaapTopicSource(""));
- }
-
- @Test
- public void testGetDmaapTopicSink() {
- TopicEndpoint manager = new TopicEndpointProxy();
- manager.addTopicSinks(configuration);
-
- assertSame(DMAAP_SINK_TOPIC, manager.getDmaapTopicSink(DMAAP_SINK_TOPIC).getTopic());
-
- assertThatIllegalStateException().isThrownBy(() -> manager.getDmaapTopicSink(NOOP_SINK_TOPIC));
- assertThatIllegalStateException().isThrownBy(() -> manager.getDmaapTopicSink(UEB_SINK_TOPIC));
-
- assertThatIllegalArgumentException().isThrownBy(() -> manager.getDmaapTopicSink(null));
- assertThatIllegalArgumentException().isThrownBy(() -> manager.getDmaapTopicSink(""));
}
@Test
@@ -451,9 +309,6 @@ public class TopicEndpointProxyTest {
assertSame(NOOP_SOURCE_TOPIC, manager.getNoopTopicSource(NOOP_SOURCE_TOPIC).getTopic());
- assertThatIllegalStateException().isThrownBy(() -> manager.getNoopTopicSource(DMAAP_SOURCE_TOPIC));
- assertThatIllegalStateException().isThrownBy(() -> manager.getNoopTopicSource(UEB_SOURCE_TOPIC));
-
assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSource(null));
assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSource(""));
}
@@ -465,9 +320,6 @@ public class TopicEndpointProxyTest {
assertSame(NOOP_SINK_TOPIC, manager.getNoopTopicSink(NOOP_SINK_TOPIC).getTopic());
- assertThatIllegalStateException().isThrownBy(() -> manager.getNoopTopicSink(DMAAP_SINK_TOPIC));
- assertThatIllegalStateException().isThrownBy(() -> manager.getNoopTopicSink(UEB_SINK_TOPIC));
-
assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSink(null));
assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSink(""));
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicFactoryTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicFactoryTestBase.java
index b5c9e614..7a819e0d 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicFactoryTestBase.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicFactoryTestBase.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX;
@@ -82,7 +84,7 @@ public abstract class BusTopicFactoryTestBase<T extends Topic> extends TopicFact
assertNotNull(item2);
assertEquals(item.getTopic(), item.getEffectiveTopic());
assertNotEquals(item2.getTopic(), item2.getEffectiveTopic());
- assertTrue(item != item2);
+ assertNotSame(item, item2);
// duplicate topics, but since they aren't managed, they should be different
T item3 = buildTopic(makeBuilder().managed(false).build());
@@ -91,9 +93,9 @@ public abstract class BusTopicFactoryTestBase<T extends Topic> extends TopicFact
assertNotNull(item4);
assertEquals(MY_TOPIC, item4.getTopic());
assertEquals(TOPIC2, item4.getEffectiveTopic());
- assertTrue(item != item3);
- assertTrue(item != item4);
- assertTrue(item3 != item4);
+ assertNotSame(item, item3);
+ assertNotSame(item, item4);
+ assertNotSame(item3, item4);
// two managed topics
T item5 = buildTopic(makeBuilder().build());
@@ -102,8 +104,8 @@ public abstract class BusTopicFactoryTestBase<T extends Topic> extends TopicFact
assertNotNull(item6);
// re-build same managed topics - should get exact same objects
- assertTrue(item5 == buildTopic(makeBuilder().topic(MY_TOPIC).build()));
- assertTrue(item6 == buildTopic(makeBuilder().topic(TOPIC2).build()));
+ assertSame(item5, buildTopic(makeBuilder().topic(MY_TOPIC).build()));
+ assertSame(item6, buildTopic(makeBuilder().topic(TOPIC2).build()));
}
/**
@@ -135,11 +137,11 @@ public abstract class BusTopicFactoryTestBase<T extends Topic> extends TopicFact
T item2 = buildTopic(servers, TOPIC2);
assertNotNull(item2);
- assertTrue(item1 != item2);
+ assertNotSame(item1, item2);
// duplicate - should be the same, as these topics are managed
T item3 = buildTopic(servers, TOPIC2);
- assertTrue(item2 == item3);
+ assertSame(item2, item3);
}
/**
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactoryTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactoryTestBase.java
deleted file mode 100644
index 92d5a865..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactoryTestBase.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
-import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
-import static org.junit.Assert.assertEquals;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX;
-
-import java.util.Map;
-import org.onap.policy.common.endpoints.event.comm.Topic;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-
-/**
- * Base class for DmaapTopicXxxFactory tests.
- *
- * @param <T> type of topic managed by the factory
- */
-public abstract class DmaapTopicFactoryTestBase<T extends Topic> extends BusTopicFactoryTestBase<T> {
-
- public static final String MY_CONN_TIMEOUT = "200";
- public static final String MY_READ_TIMEOUT = "201";
- public static final String MY_ROUNDTRIP_TIMEOUT = "202";
- public static final String MY_STICKINESS = "true";
- public static final String MY_SUBCONTEXT = "my-subcontext";
- public static final String MY_DME_VERSION = "my-version";
-
- @Override
- public void testBuildProperties() {
-
- super.testBuildProperties();
-
- // check properties specific to DMaaP/DME2
- initFactory();
-
- assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size());
-
- BusTopicParams params = getLastParams();
- assertEquals(MY_ENV, params.getEnvironment());
- assertEquals(MY_LAT, params.getLatitude());
- assertEquals(MY_LONG, params.getLongitude());
- assertEquals(MY_PARTNER, params.getPartner());
-
- Map<String, String> add = params.getAdditionalProps();
- assertEquals(MY_CONN_TIMEOUT, add.get(PolicyEndPointProperties.DME2_EP_CONN_TIMEOUT_PROPERTY));
- assertEquals(MY_READ_TIMEOUT, add.get(PolicyEndPointProperties.DME2_READ_TIMEOUT_PROPERTY));
- assertEquals(MY_ROUNDTRIP_TIMEOUT, add.get(PolicyEndPointProperties.DME2_ROUNDTRIP_TIMEOUT_PROPERTY));
- assertEquals(MY_ROUTE, add.get(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY));
- assertEquals(MY_STICKINESS, add.get(PolicyEndPointProperties.DME2_SESSION_STICKINESS_REQUIRED_PROPERTY));
- assertEquals(MY_SUBCONTEXT, add.get(PolicyEndPointProperties.DME2_SUBCONTEXT_PATH_PROPERTY));
- assertEquals(MY_DME_VERSION, add.get(PolicyEndPointProperties.DME2_VERSION_PROPERTY));
- }
-
- @Override
- public void testBuildProperties_Variations() {
- super.testBuildProperties_Variations();
-
- // check "additional" properties
- expectNullAddProp(PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX,
- PolicyEndPointProperties.DME2_EP_CONN_TIMEOUT_PROPERTY);
-
- expectNullAddProp(PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX,
- PolicyEndPointProperties.DME2_READ_TIMEOUT_PROPERTY);
-
- expectNullAddProp(PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX,
- PolicyEndPointProperties.DME2_ROUNDTRIP_TIMEOUT_PROPERTY);
-
- expectNullAddProp(PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX, PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY);
-
- expectNullAddProp(PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX,
- PolicyEndPointProperties.DME2_SESSION_STICKINESS_REQUIRED_PROPERTY);
-
- expectNullAddProp(PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX,
- PolicyEndPointProperties.DME2_SUBCONTEXT_PATH_PROPERTY);
-
- expectNullAddProp(PROPERTY_DMAAP_DME2_VERSION_SUFFIX, PolicyEndPointProperties.DME2_VERSION_PROPERTY);
- }
-
- @Override
- public void testBuildListOfStringString() {
- super.testBuildListOfStringString();
-
- // check parameters that were used
- BusTopicParams params = getLastParams();
- assertEquals(false, params.isAllowSelfSignedCerts());
- }
-
- /**
- * Tests exception cases with get(topic). DMaaP topics are special in that they
- * throw IllegalArgumentException, even for an unknown topic name; all of the
- * other Topic Factory classes throw IllegalStateException, thus we override
- * the default test method.
- */
- @Override
- public void testGet_Ex() {
- // null topic
- assertThatIllegalArgumentException().as("null topic").isThrownBy(() -> getTopic(null));
-
- // empty topic
- assertThatIllegalArgumentException().as("empty topic").isThrownBy(() -> getTopic(""));
-
- // unknown topic
- initFactory();
- buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build());
-
- assertThatIllegalStateException().as("unknown topic").isThrownBy(() -> getTopic(TOPIC2));
- }
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicPropertyBuilder.java
deleted file mode 100644
index 2e9a6cd7..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicPropertyBuilder.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_AFT_ENV;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_API_KEY;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_API_SECRET;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_CONS_GROUP;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_CONS_INST;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_EFFECTIVE_TOPIC;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_ENV;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_FETCH_LIMIT;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_FETCH_TIMEOUT;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_LAT;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_LONG;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_PARTITION;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_PARTNER;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_ROUTE;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX;
-
-import java.util.Arrays;
-import lombok.Getter;
-import org.onap.policy.common.endpoints.parameters.TopicParameters;
-
-public class DmaapTopicPropertyBuilder extends TopicPropertyBuilder {
-
- public static final String SERVER = "my-server";
- public static final String TOPIC2 = "my-topic-2";
-
- public static final String MY_CONN_TIMEOUT = "200";
- public static final String MY_READ_TIMEOUT = "201";
- public static final String MY_ROUNDTRIP_TIMEOUT = "202";
- public static final String MY_STICKINESS = "true";
- public static final String MY_SUBCONTEXT = "my-subcontext";
- public static final String MY_DME_VERSION = "my-version";
- public static final String MY_AAF_MECHID = "my-aaf-mechid";
- public static final String MY_AAF_PASS = "my-aaf-passwd";
-
- @Getter
- private TopicParameters params = new TopicParameters();
-
- /**
- * Constructs the object.
- *
- * @param prefix the prefix for the properties to be built
- */
- public DmaapTopicPropertyBuilder(String prefix) {
- super(prefix);
- }
-
- /**
- * Adds a topic and configures it's properties with default values.
- *
- * @param topic the topic to be added
- * @return this builder
- */
- public DmaapTopicPropertyBuilder makeTopic(String topic) {
- addTopic(topic);
-
- setTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, MY_EFFECTIVE_TOPIC);
- setTopicProperty(PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, MY_CONS_GROUP);
- setTopicProperty(PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, MY_CONS_INST);
- setTopicProperty(PROPERTY_MANAGED_SUFFIX, "true");
- setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true");
- setTopicProperty(PROPERTY_TOPIC_AAF_MECHID_SUFFIX, MY_AAF_MECHID);
- setTopicProperty(PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX, MY_AAF_PASS);
- setTopicProperty(PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX, MY_AFT_ENV);
- setTopicProperty(PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, "true");
- setTopicProperty(PROPERTY_TOPIC_API_KEY_SUFFIX, MY_API_KEY);
- setTopicProperty(PROPERTY_TOPIC_API_SECRET_SUFFIX, MY_API_SECRET);
- setTopicProperty(PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, MY_FETCH_LIMIT);
- setTopicProperty(PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, MY_FETCH_TIMEOUT);
- setTopicProperty(PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX, MY_CONN_TIMEOUT);
- setTopicProperty(PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX, MY_ENV);
- setTopicProperty(PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX, MY_LAT);
- setTopicProperty(PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX, MY_LONG);
- setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION);
- setTopicProperty(PROPERTY_DMAAP_DME2_PARTNER_SUFFIX, MY_PARTNER);
- setTopicProperty(PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX, MY_READ_TIMEOUT);
- setTopicProperty(PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX, MY_ROUNDTRIP_TIMEOUT);
- setTopicProperty(PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX, MY_ROUTE);
- setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
- setTopicProperty(PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX, MY_STICKINESS);
- setTopicProperty(PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX, MY_SUBCONTEXT);
- setTopicProperty(PROPERTY_DMAAP_DME2_VERSION_SUFFIX, MY_DME_VERSION);
-
- params.setTopicCommInfrastructure("dmaap");
- params.setTopic(topic);
- params.setEffectiveTopic(MY_EFFECTIVE_TOPIC);
- params.setConsumerGroup(MY_CONS_GROUP);
- params.setConsumerInstance(MY_CONS_INST);
- params.setManaged(true);
- params.setUseHttps(true);
- params.setUserName(MY_AAF_MECHID);
- params.setPassword(MY_AAF_PASS);
- params.setAftEnvironment(MY_AFT_ENV);
- params.setAllowSelfSignedCerts(true);
- params.setApiKey(MY_API_KEY);
- params.setApiSecret(MY_API_SECRET);
- params.setFetchLimit(MY_FETCH_LIMIT);
- params.setFetchTimeout(MY_FETCH_TIMEOUT);
- params.setEnvironment(MY_ENV);
- params.setLatitude(MY_LAT);
- params.setLongitude(MY_LONG);
- params.setPartitionId(MY_PARTITION);
- params.setPartner(MY_PARTNER);
- params.setServers(Arrays.asList(SERVER));
-
- return this;
- }
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkTest.java
deleted file mode 100644
index 9136108a..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP Policy Engine - Common Modules
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import static org.junit.Assert.assertNotNull;
-
-import org.junit.Test;
-
-public class DmaapTopicSinkTest {
-
- @Test
- public void test() {
- assertNotNull(DmaapTopicFactories.getSinkFactory());
- }
-
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactoryTest.java
deleted file mode 100644
index b4c2f758..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactoryTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP Policy Engine - Common Modules
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX;
-
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-
-public class DmaapTopicSourceFactoryTest extends DmaapTopicFactoryTestBase<DmaapTopicSource> {
-
- private SourceFactory factory;
-
- /**
- * Creates the object to be tested.
- */
- @Before
- @Override
- public void setUp() {
- super.setUp();
-
- factory = new SourceFactory();
- }
-
- @After
- public void tearDown() {
- factory.destroy();
- }
-
- @Test
- @Override
- public void testBuildBusTopicParams() {
- super.testBuildBusTopicParams();
- super.testBuildBusTopicParams_Ex();
- }
-
- @Test
- @Override
- public void testBuildProperties() {
- super.testBuildProperties();
-
- // check source-specific parameters that were used
- BusTopicParams params = factory.params.getFirst();
- assertEquals(MY_CONS_GROUP, params.getConsumerGroup());
- assertEquals(MY_CONS_INST, params.getConsumerInstance());
- assertEquals(MY_FETCH_LIMIT, params.getFetchLimit());
- assertEquals(MY_FETCH_TIMEOUT, params.getFetchTimeout());
-
- super.testBuildProperties_Variations();
- super.testBuildProperties_Multiple();
-
- // check default values for source-specific parameters
- checkDefault(PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
- params2 -> params2.getFetchLimit() == PolicyEndPointProperties.DEFAULT_LIMIT_FETCH,
- null, "", "invalid-limit-number");
-
- checkDefault(PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
- params2 -> params2.getFetchTimeout() == PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH,
- null, "", "invalid-timeout-number");
- }
-
- @Test
- public void testBuildListOfStringStringStringString() {
- DmaapTopicSource source1 = factory.build(servers, MY_TOPIC, MY_API_KEY, MY_API_SECRET);
- assertNotNull(source1);
-
- // check source-specific parameters that were used
- BusTopicParams params = factory.params.getFirst();
- assertEquals(MY_API_KEY, params.getApiKey());
- assertEquals(MY_API_SECRET, params.getApiSecret());
- assertEquals(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH, params.getFetchLimit());
- assertEquals(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH, params.getFetchTimeout());
- }
-
- @Test
- @Override
- public void testBuildListOfStringString() {
- super.testBuildListOfStringString();
-
- // check source-specific parameters that were used
- BusTopicParams params = factory.params.getFirst();
- assertEquals(null, params.getApiKey());
- assertEquals(null, params.getApiSecret());
- assertEquals(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH, params.getFetchLimit());
- assertEquals(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH, params.getFetchTimeout());
- }
-
- @Test
- @Override
- public void testDestroyString_testGet_testInventory() {
- super.testDestroyString_testGet_testInventory();
- super.testDestroyString_Ex();
- }
-
- @Test
- @Override
- public void testDestroy() {
- super.testDestroy();
- }
-
- @Test
- public void testGet() {
- super.testGet_Ex();
- }
-
- @Test
- public void testToString() {
- assertTrue(factory.toString().startsWith("IndexedDmaapTopicSourceFactory ["));
- }
-
- @Override
- protected void initFactory() {
- if (factory != null) {
- factory.destroy();
- }
-
- factory = new SourceFactory();
- }
-
- @Override
- protected List<DmaapTopicSource> buildTopics(Properties properties) {
- return factory.build(properties);
- }
-
- @Override
- protected DmaapTopicSource buildTopic(BusTopicParams params) {
- return factory.build(params);
- }
-
- @Override
- protected DmaapTopicSource buildTopic(List<String> servers, String topic) {
- return factory.build(servers, topic);
- }
-
- @Override
- protected void destroyFactory() {
- factory.destroy();
- }
-
- @Override
- protected void destroyTopic(String topic) {
- factory.destroy(topic);
- }
-
- @Override
- protected List<DmaapTopicSource> getInventory() {
- return factory.inventory();
- }
-
- @Override
- protected DmaapTopicSource getTopic(String topic) {
- return factory.get(topic);
- }
-
- @Override
- protected BusTopicParams getLastParams() {
- return factory.params.getLast();
- }
-
- @Override
- protected TopicPropertyBuilder makePropBuilder() {
- return new DmaapTopicPropertyBuilder(PROPERTY_DMAAP_SOURCE_TOPICS);
- }
-
- /**
- * Factory that records the parameters of all of the sources it creates.
- */
- private static class SourceFactory extends IndexedDmaapTopicSourceFactory {
- private Deque<BusTopicParams> params = new LinkedList<>();
-
- @Override
- protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) {
- params.add(busTopicParams);
- return super.makeSource(busTopicParams);
- }
- }
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceTest.java
deleted file mode 100644
index 1735e2ee..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP Policy Engine - Common Modules
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import static org.junit.Assert.assertNotNull;
-
-import org.junit.Test;
-
-public class DmaapTopicSourceTest {
-
- @Test
- public void test() {
- assertNotNull(DmaapTopicFactories.getSourceFactory());
- }
-
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactoryTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java
index 41dbac8c..3dfd96dd 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactoryTestBase.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,11 +26,11 @@ import java.util.Collections;
import org.onap.policy.common.endpoints.event.comm.Topic;
/**
- * Base class for UebTopicXxxFactory tests.
+ * Base class for KafkaTopicXxxFactory tests.
*
* @param <T> type of topic managed by the factory
*/
-public abstract class UebTopicFactoryTestBase<T extends Topic> extends BusTopicFactoryTestBase<T> {
+public abstract class KafkaTopicFactoryTestBase<T extends Topic> extends BusTopicFactoryTestBase<T> {
@Override
public void testBuildBusTopicParams_Ex() {
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
new file mode 100644
index 00000000..a00879c1
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_EFFECTIVE_TOPIC;
+import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_PARTITION;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX;
+
+import java.util.Arrays;
+import lombok.Getter;
+import org.onap.policy.common.endpoints.parameters.TopicParameters;
+
+public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
+
+ public static final String SERVER = "localhost:9092";
+ public static final String TOPIC2 = "my-topic-2";
+
+ @Getter
+ private TopicParameters params = new TopicParameters();
+
+ /**
+ * Constructs the object.
+ *
+ * @param prefix the prefix for the properties to be built
+ */
+ public KafkaTopicPropertyBuilder(String prefix) {
+ super(prefix);
+ }
+
+ /**
+ * Adds a topic and configures it's properties with default values.
+ *
+ * @param topic the topic to be added
+ * @return this builder
+ */
+ public KafkaTopicPropertyBuilder makeTopic(String topic) {
+ addTopic(topic);
+
+ setTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, MY_EFFECTIVE_TOPIC);
+ setTopicProperty(PROPERTY_MANAGED_SUFFIX, "true");
+ setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true");
+ setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION);
+ setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
+
+ params.setTopicCommInfrastructure("kafka");
+ params.setTopic(topic);
+ params.setEffectiveTopic(MY_EFFECTIVE_TOPIC);
+ params.setManaged(true);
+ params.setUseHttps(true);
+ params.setPartitionId(MY_PARTITION);
+ params.setServers(Arrays.asList(SERVER));
+
+ return this;
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
index 4896a9df..52868c44 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactoryTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022, 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,8 +21,10 @@
package org.onap.policy.common.endpoints.event.comm.bus;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
import java.util.Deque;
import java.util.LinkedList;
@@ -33,9 +35,10 @@ import org.junit.Before;
import org.junit.Test;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-public class UebTopicSinkFactoryTest extends UebTopicFactoryTestBase<UebTopicSink> {
+public class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSink> {
private SinkFactory factory;
+ public static final String KAFKA_SERVER = "localhost:9092";
/**
* Creates the object to be tested.
@@ -67,22 +70,34 @@ public class UebTopicSinkFactoryTest extends UebTopicFactoryTestBase<UebTopicSin
// check parameters that were used
BusTopicParams params = getLastParams();
- assertEquals(false, params.isAllowSelfSignedCerts());
+ assertFalse(params.isAllowSelfSignedCerts());
}
@Test
@Override
public void testBuildProperties() {
- super.testBuildProperties();
- super.testBuildProperties_Variations();
- super.testBuildProperties_Multiple();
+ List<KafkaTopicSink> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build());
+ assertEquals(1, topics.size());
+ assertEquals(MY_TOPIC, topics.get(0).getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic());
+
+ BusTopicParams params = getLastParams();
+ assertTrue(params.isManaged());
+ assertFalse(params.isUseHttps());
+ assertEquals(List.of(KAFKA_SERVER), params.getServers());
+ assertEquals(MY_TOPIC, params.getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
+ assertEquals(MY_PARTITION, params.getPartitionId());
+
+ List<KafkaTopicSink> topics2 = buildTopics(makePropBuilder().makeTopic(TOPIC3)
+ .removeTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX).build());
+ assertEquals(1, topics2.size());
+ assertEquals(TOPIC3, topics2.get(0).getTopic());
+ assertEquals(topics2.get(0).getTopic(), topics2.get(0).getEffectiveTopic());
initFactory();
assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size());
-
- BusTopicParams params = getLastParams();
- assertEquals(MY_PARTITION, params.getPartitionId());
}
@Test
@@ -105,7 +120,7 @@ public class UebTopicSinkFactoryTest extends UebTopicFactoryTestBase<UebTopicSin
@Test
public void testToString() {
- assertTrue(factory.toString().startsWith("IndexedUebTopicSinkFactory ["));
+ assertTrue(factory.toString().startsWith("IndexedKafkaTopicSinkFactory ["));
}
@Override
@@ -118,17 +133,17 @@ public class UebTopicSinkFactoryTest extends UebTopicFactoryTestBase<UebTopicSin
}
@Override
- protected List<UebTopicSink> buildTopics(Properties properties) {
+ protected List<KafkaTopicSink> buildTopics(Properties properties) {
return factory.build(properties);
}
@Override
- protected UebTopicSink buildTopic(BusTopicParams params) {
+ protected KafkaTopicSink buildTopic(BusTopicParams params) {
return factory.build(params);
}
@Override
- protected UebTopicSink buildTopic(List<String> servers, String topic) {
+ protected KafkaTopicSink buildTopic(List<String> servers, String topic) {
return factory.build(servers, topic);
}
@@ -143,12 +158,12 @@ public class UebTopicSinkFactoryTest extends UebTopicFactoryTestBase<UebTopicSin
}
@Override
- protected List<UebTopicSink> getInventory() {
+ protected List<KafkaTopicSink> getInventory() {
return factory.inventory();
}
@Override
- protected UebTopicSink getTopic(String topic) {
+ protected KafkaTopicSink getTopic(String topic) {
return factory.get(topic);
}
@@ -159,17 +174,17 @@ public class UebTopicSinkFactoryTest extends UebTopicFactoryTestBase<UebTopicSin
@Override
protected TopicPropertyBuilder makePropBuilder() {
- return new UebTopicPropertyBuilder(PROPERTY_UEB_SINK_TOPICS);
+ return new KafkaTopicPropertyBuilder(PROPERTY_KAFKA_SINK_TOPICS);
}
/**
- * Factory that records the parameters of all of the sinks it creates.
+ * Factory that records the parameters of all the sinks it creates.
*/
- private static class SinkFactory extends IndexedUebTopicSinkFactory {
+ private static class SinkFactory extends IndexedKafkaTopicSinkFactory {
private Deque<BusTopicParams> params = new LinkedList<>();
@Override
- protected UebTopicSink makeSink(BusTopicParams busTopicParams) {
+ protected KafkaTopicSink makeSink(BusTopicParams busTopicParams) {
params.add(busTopicParams);
return super.makeSink(busTopicParams);
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java
index 77452604..483e4e99 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP Policy Engine - Common Modules
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022, 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,11 +24,11 @@ import static org.junit.Assert.assertNotNull;
import org.junit.Test;
-public class UebTopicSinkTest {
+public class KafkaTopicSinkTest {
@Test
public void test() {
- assertNotNull(UebTopicFactories.getSinkFactory());
+ assertNotNull(KafkaTopicFactories.getSinkFactory());
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java
index 6c9dfcbd..392cefe9 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactoryTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java
@@ -1,8 +1,8 @@
/*
* ============LICENSE_START=======================================================
- * policy-endpoints
+ * ONAP Policy Engine - Common Modules
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,9 +21,11 @@
package org.onap.policy.common.endpoints.event.comm.bus;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS;
+import java.util.Arrays;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
@@ -33,9 +35,11 @@ import org.junit.Before;
import org.junit.Test;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-public class DmaapTopicSinkFactoryTest extends DmaapTopicFactoryTestBase<DmaapTopicSink> {
+public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSource> {
- private SinkFactory factory;
+ private SourceFactory factory;
+
+ public static final String KAFKA_SERVER = "localhost:9092";
/**
* Creates the object to be tested.
@@ -45,7 +49,7 @@ public class DmaapTopicSinkFactoryTest extends DmaapTopicFactoryTestBase<DmaapTo
public void setUp() {
super.setUp();
- factory = new SinkFactory();
+ factory = new SourceFactory();
}
@After
@@ -55,27 +59,21 @@ public class DmaapTopicSinkFactoryTest extends DmaapTopicFactoryTestBase<DmaapTo
@Test
@Override
- public void testBuildBusTopicParams() {
- super.testBuildBusTopicParams();
- super.testBuildBusTopicParams_Ex();
- }
+ public void testBuildProperties() {
- @Test
- @Override
- public void testBuildListOfStringString() {
- super.testBuildListOfStringString();
- }
+ initFactory();
- @Test
- @Override
- public void testBuildProperties() {
- super.testBuildProperties();
- super.testBuildProperties_Variations();
- super.testBuildProperties_Multiple();
+ List<KafkaTopicSource> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build());
+ assertEquals(1, topics.size());
+ assertEquals(MY_TOPIC, topics.get(0).getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic());
- // check sink-specific parameters that were used
- BusTopicParams params = factory.params.getFirst();
- assertEquals(MY_PARTITION, params.getPartitionId());
+ BusTopicParams params = getLastParams();
+ assertTrue(params.isManaged());
+ assertFalse(params.isUseHttps());
+ assertEquals(List.of(KAFKA_SERVER), params.getServers());
+ assertEquals(MY_TOPIC, params.getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
}
@Test
@@ -98,7 +96,7 @@ public class DmaapTopicSinkFactoryTest extends DmaapTopicFactoryTestBase<DmaapTo
@Test
public void testToString() {
- assertTrue(factory.toString().startsWith("IndexedDmaapTopicSinkFactory ["));
+ assertTrue(factory.toString().startsWith("IndexedKafkaTopicSourceFactory ["));
}
@Override
@@ -107,21 +105,21 @@ public class DmaapTopicSinkFactoryTest extends DmaapTopicFactoryTestBase<DmaapTo
factory.destroy();
}
- factory = new SinkFactory();
+ factory = new SourceFactory();
}
@Override
- protected List<DmaapTopicSink> buildTopics(Properties properties) {
+ protected List<KafkaTopicSource> buildTopics(Properties properties) {
return factory.build(properties);
}
@Override
- protected DmaapTopicSink buildTopic(BusTopicParams params) {
+ protected KafkaTopicSource buildTopic(BusTopicParams params) {
return factory.build(params);
}
@Override
- protected DmaapTopicSink buildTopic(List<String> servers, String topic) {
+ protected KafkaTopicSource buildTopic(List<String> servers, String topic) {
return factory.build(servers, topic);
}
@@ -136,12 +134,12 @@ public class DmaapTopicSinkFactoryTest extends DmaapTopicFactoryTestBase<DmaapTo
}
@Override
- protected List<DmaapTopicSink> getInventory() {
+ protected List<KafkaTopicSource> getInventory() {
return factory.inventory();
}
@Override
- protected DmaapTopicSink getTopic(String topic) {
+ protected KafkaTopicSource getTopic(String topic) {
return factory.get(topic);
}
@@ -152,19 +150,19 @@ public class DmaapTopicSinkFactoryTest extends DmaapTopicFactoryTestBase<DmaapTo
@Override
protected TopicPropertyBuilder makePropBuilder() {
- return new DmaapTopicPropertyBuilder(PROPERTY_DMAAP_SINK_TOPICS);
+ return new KafkaTopicPropertyBuilder(PROPERTY_KAFKA_SOURCE_TOPICS);
}
/**
- * Factory that records the parameters of all of the sinks it creates.
+ * Factory that records the parameters of all the sources it creates.
*/
- private static class SinkFactory extends IndexedDmaapTopicSinkFactory {
+ private static class SourceFactory extends IndexedKafkaTopicSourceFactory {
private Deque<BusTopicParams> params = new LinkedList<>();
@Override
- protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) {
+ protected KafkaTopicSource makeSource(BusTopicParams busTopicParams) {
params.add(busTopicParams);
- return super.makeSink(busTopicParams);
+ return super.makeSource(busTopicParams);
}
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java
index 9ef8af84..5079e601 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP Policy Engine - Common Modules
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,11 +24,11 @@ import static org.junit.Assert.assertNotNull;
import org.junit.Test;
-public class UebTopicSourceTest {
+public class KafkaTopicSourceTest {
@Test
- public void test() {
- assertNotNull(UebTopicFactories.getSourceFactory());
+ public void verifyKafkaTopicFactoriesNotNull() {
+ assertNotNull(KafkaTopicFactories.getSourceFactory());
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpointTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpointTest.java
index 0974a041..6cdb628a 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpointTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicEndpointTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,7 +29,6 @@ import static org.mockito.Mockito.verify;
import java.util.Arrays;
import java.util.Collections;
-
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactoryTest.java
index 5cb9bcbd..af7a4b29 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactoryTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactoryTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,7 +20,7 @@
package org.onap.policy.common.endpoints.event.comm.bus;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
@@ -136,7 +136,7 @@ public abstract class NoopTopicFactoryTest<F extends NoopTopicFactory<T>, T exte
initFactory();
assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC)
.setTopicProperty(PROPERTY_MANAGED_SUFFIX, "false").build()).size());
- assertThatThrownBy(() -> factory.get(MY_TOPIC));
+ assertThatIllegalStateException().isThrownBy(() -> factory.get(MY_TOPIC));
// managed undefined - default to true
initFactory();
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicFactoryTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicFactoryTestBase.java
index d8a16428..9795fd30 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicFactoryTestBase.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicFactoryTestBase.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,6 +24,8 @@ import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
@@ -126,9 +128,9 @@ public abstract class TopicFactoryTestBase<T extends Topic> extends TopicTestBas
int index = 0;
T item = lst.get(index++);
- assertTrue(item != lst.get(index++));
- assertTrue(item == lst.get(index++));
- assertTrue(item == lst.get(index++));
+ assertNotSame(item, lst.get(index++));
+ assertSame(item, lst.get(index++));
+ assertSame(item, lst.get(index++));
}
/**
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java
index 8b75fa35..00111fb2 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -59,6 +60,8 @@ public class TopicTestBase {
public static final String ROUTE_PROP = "routeOffer";
public static final String MY_ROUTE = "my-route";
+ public static final String MY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+ public static final int KAFKA_PORT = 9092;
/**
* Message used within exceptions that are expected.
@@ -76,6 +79,11 @@ public class TopicTestBase {
protected List<String> servers;
/**
+ * Servers to be added to the parameter builder.
+ */
+ protected List<String> kafkaServers;
+
+ /**
* Parameter builder used to build topic parameters.
*/
protected TopicParamsBuilder builder;
@@ -89,13 +97,14 @@ public class TopicTestBase {
addProps.put("my-key-B", "my-value-B");
servers = Arrays.asList("svra", "svrb");
+ kafkaServers = Arrays.asList("localhost:9092", "10.1.2.3:9092");
builder = makeBuilder();
}
/**
* Makes a fully populated parameter builder.
- *
+ *
* @return a new parameter builder
*/
public TopicParamsBuilder makeBuilder() {
@@ -117,6 +126,39 @@ public class TopicTestBase {
.fetchLimit(MY_FETCH_LIMIT).fetchTimeout(MY_FETCH_TIMEOUT).hostname(MY_HOST).latitude(MY_LAT)
.longitude(MY_LONG).managed(true).partitionId(MY_PARTITION).partner(MY_PARTNER)
.password(MY_PASS).port(MY_PORT).servers(servers).topic(MY_TOPIC)
- .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME);
+ .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).allowTracing(true).userName(MY_USERNAME)
+ .serializationProvider(MY_SERIALIZER);
+ }
+
+ /**
+ * Makes a fully populated parameter builder.
+ *
+ * @return a new parameter builder
+ */
+ public TopicParamsBuilder makeKafkaBuilder() {
+ addProps.clear();
+ String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule "
+ + "required username=abc password=abc serviceName=kafka;";
+ addProps.put("sasl.jaas.config", jaas);
+ addProps.put("sasl.mechanism", "SCRAM-SHA-512");
+ addProps.put("security.protocol", "SASL_PLAINTEXT");
+
+ return makeKafkaBuilder(addProps, kafkaServers);
+ }
+
+ /**
+ * Makes a fully populated parameter builder.
+ *
+ * @param addProps additional properties to be added to the builder
+ * @param servers servers to be added to the builder
+ * @return a new parameter builder
+ */
+ public TopicParamsBuilder makeKafkaBuilder(Map<String, String> addProps, List<String> servers) {
+
+ return BusTopicParams.builder().additionalProps(addProps).basePath(MY_BASE_PATH).clientName(MY_CLIENT_NAME)
+ .consumerGroup(MY_CONS_GROUP).consumerInstance(MY_CONS_INST).environment(MY_ENV)
+ .hostname(MY_HOST).partitionId(MY_PARTITION).partner(MY_PARTNER).fetchTimeout(MY_FETCH_TIMEOUT)
+ .port(KAFKA_PORT).servers(servers).topic(MY_TOPIC)
+ .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(false).allowTracing(true);
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicPropertyBuilder.java
deleted file mode 100644
index cb5507e5..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicPropertyBuilder.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_AFT_ENV;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_API_KEY;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_API_SECRET;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_CONS_GROUP;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_CONS_INST;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_EFFECTIVE_TOPIC;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_FETCH_LIMIT;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_FETCH_TIMEOUT;
-import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_PARTITION;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX;
-
-import java.util.Arrays;
-import lombok.Getter;
-import org.onap.policy.common.endpoints.parameters.TopicParameters;
-
-public class UebTopicPropertyBuilder extends TopicPropertyBuilder {
-
- public static final String SERVER = "my-server";
- public static final String TOPIC2 = "my-topic-2";
-
- public static final String MY_AAF_MECHID = "my-aaf-mechid";
- public static final String MY_AAF_PASS = "my-aaf-passwd";
-
- @Getter
- private TopicParameters params = new TopicParameters();
-
- /**
- * Constructs the object.
- *
- * @param prefix the prefix for the properties to be built
- */
- public UebTopicPropertyBuilder(String prefix) {
- super(prefix);
- }
-
- /**
- * Adds a topic and configures it's properties with default values.
- *
- * @param topic the topic to be added
- * @return this builder
- */
- public UebTopicPropertyBuilder makeTopic(String topic) {
- addTopic(topic);
-
- setTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, MY_EFFECTIVE_TOPIC);
- setTopicProperty(PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, MY_CONS_GROUP);
- setTopicProperty(PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, MY_CONS_INST);
- setTopicProperty(PROPERTY_MANAGED_SUFFIX, "true");
- setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true");
- setTopicProperty(PROPERTY_TOPIC_AAF_MECHID_SUFFIX, MY_AAF_MECHID);
- setTopicProperty(PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX, MY_AAF_PASS);
- setTopicProperty(PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX, MY_AFT_ENV);
- setTopicProperty(PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, "true");
- setTopicProperty(PROPERTY_TOPIC_API_KEY_SUFFIX, MY_API_KEY);
- setTopicProperty(PROPERTY_TOPIC_API_SECRET_SUFFIX, MY_API_SECRET);
- setTopicProperty(PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, MY_FETCH_LIMIT);
- setTopicProperty(PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, MY_FETCH_TIMEOUT);
- setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION);
- setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
-
- params.setTopicCommInfrastructure("ueb");
- params.setTopic(topic);
- params.setEffectiveTopic(MY_EFFECTIVE_TOPIC);
- params.setConsumerGroup(MY_CONS_GROUP);
- params.setConsumerInstance(MY_CONS_INST);
- params.setManaged(true);
- params.setUseHttps(true);
- params.setUserName(MY_AAF_MECHID);
- params.setPassword(MY_AAF_PASS);
- params.setAftEnvironment(MY_AFT_ENV);
- params.setAllowSelfSignedCerts(true);
- params.setApiKey(MY_API_KEY);
- params.setApiSecret(MY_API_SECRET);
- params.setFetchLimit(MY_FETCH_LIMIT);
- params.setFetchTimeout(MY_FETCH_TIMEOUT);
- params.setPartitionId(MY_PARTITION);
- params.setServers(Arrays.asList(SERVER));
-
- return this;
- }
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactoryTest.java
deleted file mode 100644
index 81e30756..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactoryTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP Policy Engine - Common Modules
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX;
-import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS;
-
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-
-public class UebTopicSourceFactoryTest extends UebTopicFactoryTestBase<UebTopicSource> {
-
- private SourceFactory factory;
-
- /**
- * Creates the object to be tested.
- */
- @Before
- @Override
- public void setUp() {
- super.setUp();
-
- factory = new SourceFactory();
- }
-
- @After
- public void tearDown() {
- factory.destroy();
- }
-
- @Test
- @Override
- public void testBuildBusTopicParams() {
- super.testBuildBusTopicParams();
- super.testBuildBusTopicParams_Ex();
- }
-
- @Test
- @Override
- public void testBuildProperties() {
-
- super.testBuildProperties();
-
- // check source-specific parameters that were used
- BusTopicParams params = factory.params.getFirst();
- assertEquals(MY_CONS_GROUP, params.getConsumerGroup());
- assertEquals(MY_CONS_INST, params.getConsumerInstance());
- assertEquals(MY_FETCH_LIMIT, params.getFetchLimit());
- assertEquals(MY_FETCH_TIMEOUT, params.getFetchTimeout());
-
- super.testBuildProperties_Variations();
- super.testBuildProperties_Multiple();
-
- // check default values for source-specific parameters
- checkDefault(PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
- params2 -> params2.getFetchLimit() == PolicyEndPointProperties.DEFAULT_LIMIT_FETCH,
- null, "", "invalid-limit-number");
-
- checkDefault(PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
- params2 -> params2.getFetchTimeout() == PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH,
- null, "", "invalid-timeout-number");
- }
-
- @Test
- public void testBuildListOfStringStringStringString() {
- UebTopicSource source1 = factory.build(servers, MY_TOPIC, MY_API_KEY, MY_API_SECRET);
- assertNotNull(source1);
-
- // check source-specific parameters that were used
- BusTopicParams params = factory.params.getFirst();
- assertEquals(MY_API_KEY, params.getApiKey());
- assertEquals(MY_API_SECRET, params.getApiSecret());
- assertEquals(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH, params.getFetchLimit());
- assertEquals(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH, params.getFetchTimeout());
- }
-
- @Test
- @Override
- public void testBuildListOfStringString() {
- super.testBuildListOfStringString();
-
- // check source-specific parameters that were used
- BusTopicParams params = factory.params.getFirst();
- assertEquals(null, params.getApiKey());
- assertEquals(null, params.getApiSecret());
- assertEquals(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH, params.getFetchLimit());
- assertEquals(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH, params.getFetchTimeout());
-
- assertEquals(true, params.isAllowSelfSignedCerts());
- }
-
- @Test
- @Override
- public void testDestroyString_testGet_testInventory() {
- super.testDestroyString_testGet_testInventory();
- super.testDestroyString_Ex();
- }
-
- @Test
- @Override
- public void testDestroy() {
- super.testDestroy();
- }
-
- @Test
- public void testGet() {
- super.testGet_Ex();
- }
-
- @Test
- public void testToString() {
- assertTrue(factory.toString().startsWith("IndexedUebTopicSourceFactory ["));
- }
-
- @Override
- protected void initFactory() {
- if (factory != null) {
- factory.destroy();
- }
-
- factory = new SourceFactory();
- }
-
- @Override
- protected List<UebTopicSource> buildTopics(Properties properties) {
- return factory.build(properties);
- }
-
- @Override
- protected UebTopicSource buildTopic(BusTopicParams params) {
- return factory.build(params);
- }
-
- @Override
- protected UebTopicSource buildTopic(List<String> servers, String topic) {
- return factory.build(servers, topic);
- }
-
- @Override
- protected void destroyFactory() {
- factory.destroy();
- }
-
- @Override
- protected void destroyTopic(String topic) {
- factory.destroy(topic);
- }
-
- @Override
- protected List<UebTopicSource> getInventory() {
- return factory.inventory();
- }
-
- @Override
- protected UebTopicSource getTopic(String topic) {
- return factory.get(topic);
- }
-
- @Override
- protected BusTopicParams getLastParams() {
- return factory.params.getLast();
- }
-
- @Override
- protected TopicPropertyBuilder makePropBuilder() {
- return new UebTopicPropertyBuilder(PROPERTY_UEB_SOURCE_TOPICS);
- }
-
- /**
- * Factory that records the parameters of all of the sources it creates.
- */
- private static class SourceFactory extends IndexedUebTopicSourceFactory {
- private Deque<BusTopicParams> params = new LinkedList<>();
-
- @Override
- protected UebTopicSource makeSource(BusTopicParams busTopicParams) {
- params.add(busTopicParams);
- return super.makeSource(busTopicParams);
- }
- }
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
index 0255c100..2c33a257 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
@@ -2,7 +2,8 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,218 +21,257 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.att.nsa.cambria.client.CambriaConsumer;
import java.io.IOException;
-import java.util.Arrays;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
-import org.apache.commons.collections4.IteratorUtils;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
-import org.powermock.reflect.Whitebox;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
public class BusConsumerTest extends TopicTestBase {
+ private static final int SHORT_TIMEOUT_MILLIS = 10;
+ private static final int LONG_TIMEOUT_MILLIS = 3000;
+
+ @Mock
+ KafkaConsumer<String, String> mockedKafkaConsumer;
+
+ AutoCloseable closeable;
+
@Before
@Override
public void setUp() {
super.setUp();
+ closeable = MockitoAnnotations.openMocks(this);
}
- @Test
- public void testCambriaConsumerWrapper() {
- // verify that different wrappers can be built
- new CambriaConsumerWrapper(makeBuilder().build());
- new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
- new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
- new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
- new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
- new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
- new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
- new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
- new CambriaConsumerWrapper(makeBuilder().userName(null).build());
- new CambriaConsumerWrapper(makeBuilder().password(null).build());
- new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build());
+ @After
+ public void tearDown() throws Exception {
+ closeable.close();
}
- @Test
- public void testCambriaConsumerWrapperFetch() throws Exception {
- CambriaConsumer inner = mock(CambriaConsumer.class);
- List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
- when(inner.fetch()).thenReturn(lst);
-
- CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
- Whitebox.setInternalState(cons, "consumer", inner);
-
- assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
- // arrange to throw exception next time fetch is called
- IOException ex = new IOException(EXPECTED);
- when(inner.fetch()).thenThrow(ex);
-
- cons.fetchTimeout = 10;
-
- try {
- cons.fetch();
- fail("missing exception");
-
- } catch (IOException e) {
- assertEquals(ex, e);
- }
+ @Test
+ public void testFetchingBusConsumer() {
+ // should not be negative
+ var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
+ assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+
+ // should not be zero
+ cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
+ assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+
+ // should not be too large
+ cons = new FetchingBusConsumerImpl(
+ makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
+ assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+
+ // should not be what was specified
+ cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
+ assertThat(cons.getSleepTime()).isEqualTo(100);
}
@Test
- public void testCambriaConsumerWrapperClose() {
- CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
-
- // set filter several times to cause different branches of close() to be executed
- for (int count = 0; count < 3; ++count) {
- cons.close();
- cons.setFilter("close=" + count);
- }
+ public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
+
+ var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
+
+ private CountDownLatch started = new CountDownLatch(1);
+
+ @Override
+ protected void sleepAfterFetchFailure() {
+ started.countDown();
+ super.sleepAfterFetchFailure();
+ }
+ };
+
+ // full sleep
+ long tstart = System.currentTimeMillis();
+ cons.sleepAfterFetchFailure();
+ assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
+
+ // close while sleeping - sleep should halt prematurely
+ cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
+ cons.started = new CountDownLatch(1);
+ Thread thread = new Thread(cons::sleepAfterFetchFailure);
+ tstart = System.currentTimeMillis();
+ thread.start();
+ cons.started.await();
+ cons.close();
+ thread.join();
+ assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
+
+ // interrupt while sleeping - sleep should halt prematurely
+ cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
+ cons.started = new CountDownLatch(1);
+ thread = new Thread(cons::sleepAfterFetchFailure);
+ tstart = System.currentTimeMillis();
+ thread.start();
+ cons.started.await();
+ thread.interrupt();
+ thread.join();
+ assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
}
@Test
- public void testCambriaConsumerWrapperSetFilter() {
- // set filter several times to cause different branches to be executed
- CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
- for (int count = 0; count < 3; ++count) {
- cons.setFilter("set-filter=" + count);
- }
+ public void testKafkaConsumerWrapper() {
+ // verify that different wrappers can be built
+ assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
}
- @Test
- public void testCambriaConsumerWrapperToString() {
- assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
+ @Test(expected = IllegalArgumentException.class)
+ public void testKafkaConsumerWrapper_InvalidTopic() {
+ new KafkaConsumerWrapper(makeBuilder().topic(null).build());
}
@Test
- public void testDmaapConsumerWrapper() throws Exception {
- // verify that different wrappers can be built
- new DmaapAafConsumerWrapper(makeBuilder().build());
+ public void testKafkaConsumerWrapperFetch() {
+
+ //Setup Properties for consumer
+ Properties kafkaProps = new Properties();
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
+ kafkaProps.setProperty("enable.auto.commit", "true");
+ kafkaProps.setProperty("auto.commit.interval.ms", "1000");
+ kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
+ kafka.consumer = consumer;
+
+ assertThrows(java.lang.IllegalStateException.class, () -> kafka.fetch().iterator().hasNext());
+ consumer.close();
}
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
- new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
+ @Test
+ public void testFetchNoMessages() throws IOException {
+ KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
+ kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
+
+ when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
+
+ Iterable<String> result = kafkaConsumerWrapper.fetch();
+
+ verify(mockedKafkaConsumer, times(1)).poll(any());
+
+ assertThat(result != null);
+
+ assertThat(!result.iterator().hasNext());
+
+ mockedKafkaConsumer.close();
}
@Test
- public void testDmaapConsumerWrapperFetch() throws Exception {
- DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
- MRConsumerImpl cons = mock(MRConsumerImpl.class);
+ public void testFetchWithMessages() {
+ // Setup
+ KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
+ kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
- dmaap.fetchTimeout = 5;
- dmaap.consumer = cons;
+ ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
+ recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
+ ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
- // null return
- when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
- assertFalse(dmaap.fetch().iterator().hasNext());
+ when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
- // with messages, 200
- List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
- MRConsumerResponse resp = new MRConsumerResponse();
- resp.setResponseCode("200");
- resp.setActualMessages(lst);
- when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
+ Iterable<String> result = kafkaConsumerWrapper.fetch();
- assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
+ verify(mockedKafkaConsumer, times(1)).poll(any());
- // null messages
- resp.setActualMessages(null);
- when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
+ verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
- assertFalse(dmaap.fetch().iterator().hasNext());
+ assertThat(result != null);
- // with messages, NOT 200
- resp.setResponseCode("400");
- resp.setActualMessages(lst);
- when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
+ assertThat(result.iterator().hasNext());
- assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
- }
+ assertThat(result.iterator().next().equals("value"));
- @Test
- public void testDmaapConsumerWrapperClose() throws Exception {
- new DmaapAafConsumerWrapper(makeBuilder().build()).close();
+ mockedKafkaConsumer.close();
}
@Test
- public void testDmaapConsumerWrapperToString() throws Exception {
- assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
- }
+ public void testFetchWithMessagesAndTraceparent() {
+ // Setup
+ KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
+ kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
- @Test
- public void testDmaapAafConsumerWrapper() throws Exception {
- // verify that different wrappers can be built
- new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
- new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build());
- }
+ ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
+ record.headers().add(
+ "traceparent",
+ "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8)
+ );
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
- /*
- * Unfortunately, the MR code intercepts this and throws an exception before the
- * wrapper gets a chance to check it, thus this test does not improve the coverage
- * for the constructor.
- */
- new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
- }
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
+ recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
+ ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
- @Test
- public void testDmaapAafConsumerWrapperToString() throws Exception {
- assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
- }
+ when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
- @Test
- public void testDmaapDmeConsumerWrapper() throws Exception {
- // verify that different wrappers can be built
- new DmaapDmeConsumerWrapper(makeBuilder().build());
- new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
- new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
- new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
-
- addProps.put(ROUTE_PROP, MY_ROUTE);
- new DmaapDmeConsumerWrapper(makeBuilder().build());
- new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
- }
+ Iterable<String> result = kafkaConsumerWrapper.fetch();
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
- new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
- }
+ verify(mockedKafkaConsumer, times(1)).poll(any());
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
- new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
+ verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
+
+ assertThat(result != null);
+
+ assertThat(result.iterator().hasNext());
+
+ assertThat(result.iterator().next().equals("value"));
+
+ mockedKafkaConsumer.close();
}
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
- new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
+
+ @Test
+ public void testKafkaConsumerWrapperClose() {
+ assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
}
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
- new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
+ @Test
+ public void testKafkaConsumerWrapperToString() {
+ assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
}
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
- new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
+ private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
+
+ protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
+ super(busTopicParams);
+ }
+
+ @Override
+ public Iterable<String> fetch() {
+ return null;
+ }
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java
deleted file mode 100644
index 5a933e9b..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
-import org.onap.dmaap.mr.client.response.MRPublisherResponse;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
-import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.CambriaPublisherWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapAafPublisherWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapDmePublisherWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapPublisherWrapper;
-
-public class BusPublisherTest extends TopicTestBase {
-
- @Before
- @Override
- public void setUp() {
- super.setUp();
- }
-
- @Test
- public void testCambriaPublisherWrapper() {
- // verify that different wrappers can be built
- new CambriaPublisherWrapper(makeBuilder().build());
- new CambriaPublisherWrapper(makeBuilder().useHttps(false).build());
- new CambriaPublisherWrapper(makeBuilder().useHttps(true).build());
- new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
- new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
- new CambriaPublisherWrapper(makeBuilder().apiKey(null).build());
- new CambriaPublisherWrapper(makeBuilder().apiSecret(null).build());
- new CambriaPublisherWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
- new CambriaPublisherWrapper(makeBuilder().userName(null).build());
- new CambriaPublisherWrapper(makeBuilder().password(null).build());
- new CambriaPublisherWrapper(makeBuilder().userName(null).password(null).build());
- }
-
- @Test
- public void testCambriaPublisherWrapperSend() throws Exception {
- CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class);
- CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
- cambria.publisher = pub;
-
- assertTrue(cambria.send(MY_PARTITION, MY_MESSAGE));
-
- // publisher exception
- when(pub.send(anyString(), anyString())).thenThrow(new IOException(EXPECTED));
- assertFalse(cambria.send(MY_PARTITION2, MY_MESSAGE2));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testCambriaPublisherWrapperSend_InvalidMsg() {
- CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
- cambria.publisher = mock(CambriaBatchingPublisher.class);
-
- cambria.send(MY_PARTITION, null);
- }
-
- @Test
- public void testCambriaPublisherWrapperClose() {
- CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class);
- CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
- cambria.publisher = pub;
-
- cambria.close();
- verify(pub).close();
-
- // try again, this time with an exception
- doThrow(new RuntimeException(EXPECTED)).when(pub).close();
- cambria.close();
- }
-
- @Test
- public void testDmaapPublisherWrapper() {
- // verify with different constructor arguments
- new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, true);
- new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, false);
- new DmaapPublisherWrapper(ProtocolTypeConstants.DME2, servers, MY_TOPIC, MY_USERNAME, MY_PASS, true) {};
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapPublisherWrapper_InvalidTopic() {
- new DmaapPublisherWrapper(ProtocolTypeConstants.DME2, servers, "", MY_USERNAME, MY_PASS, true) {};
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapPublisherWrapper_Aaf_NullServers() {
- new DmaapAafPublisherWrapper(null, MY_TOPIC, MY_USERNAME, MY_PASS, true);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapPublisherWrapper_Aaf_NoServers() {
- new DmaapAafPublisherWrapper(Collections.emptyList(), MY_TOPIC, MY_USERNAME, MY_PASS, true);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapPublisherWrapper_InvalidProtocol() {
- new DmaapPublisherWrapper(ProtocolTypeConstants.HTTPNOAUTH, servers, MY_TOPIC, MY_USERNAME, MY_PASS, true) {};
- }
-
- @Test
- public void testDmaapPublisherWrapperClose() throws Exception {
- MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class);
- DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, true);
- dmaap.publisher = pub;
-
- dmaap.close();
- verify(pub).close(anyLong(), any(TimeUnit.class));
-
- // close, but with exception from publisher
- doThrow(new IOException(EXPECTED)).when(pub).close(anyLong(), any(TimeUnit.class));
- dmaap.close();
- }
-
- @Test
- public void testDmaapPublisherWrapperSend() {
- MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class);
- DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, true);
- dmaap.publisher = pub;
-
- // null response
- assertTrue(dmaap.send(MY_PARTITION, MY_MESSAGE));
- verify(pub).setPubResponse(any(MRPublisherResponse.class));
- verify(pub).send(MY_PARTITION, MY_MESSAGE);
-
- // with response
- pub = mock(MRSimplerBatchPublisher.class);
- dmaap.publisher = pub;
-
- MRPublisherResponse resp = new MRPublisherResponse();
- when(pub.sendBatchWithResponse()).thenReturn(resp);
- assertTrue(dmaap.send(MY_PARTITION, MY_MESSAGE));
- verify(pub).setPubResponse(any(MRPublisherResponse.class));
- verify(pub).send(MY_PARTITION, MY_MESSAGE);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapPublisherWrapperSend_NullMessage() {
- MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class);
- DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, true);
- dmaap.publisher = pub;
-
- dmaap.send(MY_PARTITION, null);
- }
-
- @Test
- public void testDmaapDmePublisherWrapper() {
- // verify with different parameters
- new DmaapDmePublisherWrapper(makeBuilder().build());
- new DmaapDmePublisherWrapper(makeBuilder().additionalProps(null).build());
-
- addProps.put(ROUTE_PROP, MY_ROUTE);
- new DmaapDmePublisherWrapper(makeBuilder().build());
- new DmaapDmePublisherWrapper(makeBuilder().partner(null).build());
-
- addProps.put("null-value", null);
- new DmaapDmePublisherWrapper(makeBuilder().build());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmePublisherWrapper_InvalidEnv() {
- new DmaapDmePublisherWrapper(makeBuilder().environment(null).build());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmePublisherWrapper_InvalidAft() {
- new DmaapDmePublisherWrapper(makeBuilder().aftEnvironment(null).build());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmePublisherWrapper_InvalidLat() {
- new DmaapDmePublisherWrapper(makeBuilder().latitude(null).build());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmePublisherWrapper_InvalidLong() {
- new DmaapDmePublisherWrapper(makeBuilder().longitude(null).build());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmePublisherWrapper_InvalidPartner() {
- new DmaapDmePublisherWrapper(makeBuilder().partner(null).build());
- }
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java
index 01028045..0a2a5d34 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -53,7 +54,7 @@ public class BusTopicBaseTest extends TopicTestBase {
@Test
public void testSerialize() {
- new GsonTestUtils().compareGson(base, BusTopicBaseTest.class);
+ assertThatCode(() -> new GsonTestUtils().compareGson(base, BusTopicBaseTest.class)).doesNotThrowAnyException();
}
@Test
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java
index c00f2b56..3abb8b10 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java
@@ -2,7 +2,8 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,8 +25,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.util.Arrays;
import java.util.LinkedList;
+import java.util.List;
import java.util.function.BiConsumer;
import org.junit.Before;
import org.junit.Test;
@@ -41,12 +42,12 @@ public class BusTopicParamsTest extends TopicTestBase {
}
@Test
- public void test() {
+ public void testGetters() {
BusTopicParams params = makeBuilder().build();
assertEquals(addProps, params.getAdditionalProps());
assertEquals(MY_AFT_ENV, params.getAftEnvironment());
- assertEquals(true, params.isAllowSelfSignedCerts());
+ assertTrue(params.isAllowSelfSignedCerts());
assertEquals(MY_API_KEY, params.getApiKey());
assertEquals(MY_API_SECRET, params.getApiSecret());
assertEquals(MY_BASE_PATH, params.getBasePath());
@@ -59,7 +60,7 @@ public class BusTopicParamsTest extends TopicTestBase {
assertEquals(MY_HOST, params.getHostname());
assertEquals(MY_LAT, params.getLatitude());
assertEquals(MY_LONG, params.getLongitude());
- assertEquals(true, params.isManaged());
+ assertTrue(params.isManaged());
assertEquals(MY_PARTITION, params.getPartitionId());
assertEquals(MY_PARTNER, params.getPartner());
assertEquals(MY_PASS, params.getPassword());
@@ -67,13 +68,21 @@ public class BusTopicParamsTest extends TopicTestBase {
assertEquals(servers, params.getServers());
assertEquals(MY_TOPIC, params.getTopic());
assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
- assertEquals(true, params.isUseHttps());
+ assertTrue(params.isUseHttps());
assertEquals(MY_USERNAME, params.getUserName());
+ }
+ @Test
+ public void testBooleanGetters() {
// ensure that booleans are independent of each other
- testBoolean("true:false:false", (bldr, flag) -> bldr.allowSelfSignedCerts(flag));
- testBoolean("false:true:false", (bldr, flag) -> bldr.managed(flag));
- testBoolean("false:false:true", (bldr, flag) -> bldr.useHttps(flag));
+ testBoolean("true:false:false", TopicParamsBuilder::allowSelfSignedCerts);
+ testBoolean("false:true:false", TopicParamsBuilder::managed);
+ testBoolean("false:false:true", TopicParamsBuilder::useHttps);
+ }
+
+ @Test
+ public void testValidators() {
+ BusTopicParams params = makeBuilder().build();
// test validity methods
assertTrue(params.isAdditionalPropsValid());
@@ -94,8 +103,10 @@ public class BusTopicParamsTest extends TopicTestBase {
assertFalse(params.isServersInvalid());
assertFalse(params.isTopicInvalid());
assertTrue(params.isUserNameValid());
+ }
- // test inverted validity
+ @Test
+ public void testInvertedValidators() {
assertFalse(makeBuilder().additionalProps(null).build().isAdditionalPropsValid());
assertTrue(makeBuilder().aftEnvironment("").build().isAftEnvironmentInvalid());
assertFalse(makeBuilder().apiKey("").build().isApiKeyValid());
@@ -114,15 +125,15 @@ public class BusTopicParamsTest extends TopicTestBase {
assertTrue(makeBuilder().port(65536).build().isPortInvalid());
assertTrue(makeBuilder().servers(null).build().isServersInvalid());
assertTrue(makeBuilder().servers(new LinkedList<>()).build().isServersInvalid());
- assertTrue(makeBuilder().servers(Arrays.asList("")).build().isServersInvalid());
- assertFalse(makeBuilder().servers(Arrays.asList("one-server")).build().isServersInvalid());
+ assertTrue(makeBuilder().servers(List.of("")).build().isServersInvalid());
+ assertFalse(makeBuilder().servers(List.of("one-server")).build().isServersInvalid());
assertTrue(makeBuilder().topic("").build().isTopicInvalid());
assertFalse(makeBuilder().userName("").build().isUserNameValid());
}
/**
* Tests the boolean methods by applying a function, once with {@code false} and once
- * with {@code true}. Verifies that all of the boolean methods return the correct
+ * with {@code true}. Verifies that all the boolean methods return the correct
* value by concatenating them.
*
* @param expectedTrue the string that is expected when {@code true} is passed to the
@@ -137,7 +148,7 @@ public class BusTopicParamsTest extends TopicTestBase {
BusTopicParams params = builder.build();
assertEquals("false:false:false",
- "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
+ params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
// now try the "true" case
@@ -145,6 +156,6 @@ public class BusTopicParamsTest extends TopicTestBase {
params = builder.build();
assertEquals(expectedTrue,
- "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
+ params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java
index 634ee762..7aa70b2a 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +21,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -31,6 +33,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
+import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -61,7 +64,8 @@ public class InlineBusTopicSinkTest extends TopicTestBase {
@Test
public void testSerialize() {
- new GsonTestUtils().compareGson(sink, InlineBusTopicSinkTest.class);
+ assertThatCode(() -> new GsonTestUtils().compareGson(sink, InlineBusTopicSinkTest.class))
+ .doesNotThrowAnyException();
}
@Test
@@ -122,7 +126,7 @@ public class InlineBusTopicSinkTest extends TopicTestBase {
verify(pub).send(MY_PARTITION, MY_MESSAGE);
verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
- assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(sink.getRecentEvents()));
+ assertEquals(List.of(MY_MESSAGE), Arrays.asList(sink.getRecentEvents()));
// arrange for send to throw an exception
when(pub.send(anyString(), anyString())).thenThrow(new RuntimeException(EXPECTED));
@@ -136,8 +140,7 @@ public class InlineBusTopicSinkTest extends TopicTestBase {
@Test(expected = IllegalArgumentException.class)
public void testSend_NullMessage() {
sink.start();
- BusPublisher pub = mock(BusPublisher.class);
- sink.publisher = pub;
+ sink.publisher = mock(BusPublisher.class);
sink.send(null);
}
@@ -145,16 +148,14 @@ public class InlineBusTopicSinkTest extends TopicTestBase {
@Test(expected = IllegalArgumentException.class)
public void testSend_EmptyMessage() {
sink.start();
- BusPublisher pub = mock(BusPublisher.class);
- sink.publisher = pub;
+ sink.publisher = mock(BusPublisher.class);
sink.send("");
}
@Test(expected = IllegalStateException.class)
public void testSend_NotStarted() {
- BusPublisher pub = mock(BusPublisher.class);
- sink.publisher = pub;
+ sink.publisher = mock(BusPublisher.class);
sink.send(MY_MESSAGE);
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.java
deleted file mode 100644
index d9bc990b..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.utils.gson.GsonTestUtils;
-
-public class InlineDmaapTopicSinkTest extends TopicTestBase {
- private InlineDmaapTopicSink sink;
-
- /**
- * Creates the object to be tested.
- */
- @Before
- @Override
- public void setUp() {
- super.setUp();
-
- sink = new InlineDmaapTopicSink(makeBuilder().build());
- }
-
- @After
- public void tearDown() {
- sink.shutdown();
- }
-
- @Test
- public void testSerialize() {
- new GsonTestUtils().compareGson(sink, InlineDmaapTopicSinkTest.class);
- }
-
- @Test
- public void testToString() {
- assertTrue(sink.toString().startsWith("InlineDmaapTopicSink ["));
- }
-
- @Test
- public void testInit() {
- // nothing null
- sink = new InlineDmaapTopicSink(makeBuilder().build());
- sink.init();
- sink.shutdown();
-
- // no DME2 info
- sink = new InlineDmaapTopicSink(makeBuilder().environment(null).aftEnvironment(null).latitude(null)
- .longitude(null).partner(null).build());
- sink.init();
- sink.shutdown();
- }
-
- @Test
- public void testGetTopicCommInfrastructure() {
- assertEquals(CommInfrastructure.DMAAP, sink.getTopicCommInfrastructure());
- }
-
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java
index a45504f2..643025c2 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -28,10 +29,9 @@ import org.junit.Before;
import org.junit.Test;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.utils.gson.GsonTestUtils;
-public class InlineUebTopicSinkTest extends TopicTestBase {
- private InlineUebTopicSink sink;
+public class InlineKafkaTopicSinkTest extends TopicTestBase {
+ private InlineKafkaTopicSink sink;
/**
* Creates the object to be tested.
@@ -41,7 +41,7 @@ public class InlineUebTopicSinkTest extends TopicTestBase {
public void setUp() {
super.setUp();
- sink = new InlineUebTopicSink(makeBuilder().build());
+ sink = new InlineKafkaTopicSink(makeKafkaBuilder().build());
}
@After
@@ -50,23 +50,21 @@ public class InlineUebTopicSinkTest extends TopicTestBase {
}
@Test
- public void testSerialize() {
- new GsonTestUtils().compareGson(sink, InlineUebTopicSinkTest.class);
- }
-
- @Test
public void testToString() {
- assertTrue(sink.toString().startsWith("InlineUebTopicSink ["));
+ assertTrue(sink.toString().startsWith("InlineKafkaTopicSink ["));
}
@Test
public void testInit() {
+ // nothing null
+ sink = new InlineKafkaTopicSink(makeKafkaBuilder().build());
sink.init();
+ assertThatCode(() -> sink.shutdown()).doesNotThrowAnyException();
}
@Test
public void testGetTopicCommInfrastructure() {
- assertEquals(CommInfrastructure.UEB, sink.getTopicCommInfrastructure());
+ assertEquals(CommInfrastructure.KAFKA, sink.getTopicCommInfrastructure());
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java
index 16d74df2..dbdd8813 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,8 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -42,8 +44,8 @@ import org.mockito.stubbing.Answer;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
import org.onap.policy.common.utils.gson.GsonTestUtils;
+import org.onap.policy.common.utils.network.NetworkUtil;
public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
private Thread thread;
@@ -72,7 +74,8 @@ public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
@Test
public void testSerialize() {
- new GsonTestUtils().compareGson(source, SingleThreadedBusTopicSourceTest.class);
+ assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedBusTopicSourceTest.class))
+ .doesNotThrowAnyException();
}
@Test
@@ -159,11 +162,30 @@ public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
@Test
public void testSingleThreadedBusTopicSource() {
+ // Note: if the value contains "-", it's probably a UUID
+
// verify that different wrappers can be built
- new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).build());
- new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerInstance(null).build());
- new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchTimeout(-1).build());
- new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchLimit(-1).build());
+ source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
+ assertThat(source.getConsumerGroup()).isEqualTo(MY_CONS_GROUP);
+ assertThat(source.getConsumerInstance()).isEqualTo(MY_CONS_INST);
+
+ // group is null => group is UUID, instance is as provided
+ source = new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).build());
+ assertThat(source.getConsumerGroup()).contains("-").isNotEqualTo(NetworkUtil.getHostname());
+ assertThat(source.getConsumerInstance()).isEqualTo(MY_CONS_INST);
+
+ // instance is null => group is as provided, instance is UUID
+ source = new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerInstance(null).build());
+ assertThat(source.getConsumerGroup()).isEqualTo(MY_CONS_GROUP);
+ assertThat(source.getConsumerInstance()).contains("-").isNotEqualTo(NetworkUtil.getHostname());
+
+ // group & instance are null => group is UUID, instance is hostname
+ source = new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).consumerInstance(null).build());
+ assertThat(source.getConsumerGroup()).contains("-").isNotEqualTo(NetworkUtil.getHostname());
+ assertThat(source.getConsumerInstance()).isEqualTo(NetworkUtil.getHostname());
+
+ assertThatCode(() -> new SingleThreadedBusTopicSourceImpl(
+ makeBuilder().fetchLimit(-1).fetchTimeout(-1).build())).doesNotThrowAnyException();
}
@Test
@@ -284,22 +306,6 @@ public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
}
@Test
- public void testSetFilter() {
- FilterableBusConsumer filt = mock(FilterableBusConsumer.class);
- cons = filt;
-
- source.start();
- source.setFilter("my-filter");
- verify(filt).setFilter("my-filter");
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testSetFilter_Unsupported() {
- source.start();
- source.setFilter("unsupported-filter");
- }
-
- @Test
public void testGetConsumerGroup() {
assertEquals(MY_CONS_GROUP, source.getConsumerGroup());
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.java
deleted file mode 100644
index b7faf161..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.net.MalformedURLException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.utils.gson.GsonTestUtils;
-
-public class SingleThreadedDmaapTopicSourceTest extends TopicTestBase {
- private static final String SOURCE_NAME = "SingleThreadedDmaapTopicSource [";
- private SingleThreadedDmaapTopicSource source;
-
- /**
- * Creates the object to be tested.
- */
- @Before
- @Override
- public void setUp() {
- super.setUp();
-
- source = new SingleThreadedDmaapTopicSource(makeBuilder().build());
- }
-
- @After
- public void tearDown() {
- source.shutdown();
- }
-
- @Test
- public void testSerialize() {
- new GsonTestUtils().compareGson(source, SingleThreadedDmaapTopicSourceTest.class);
- }
-
- @Test
- public void testToString() {
- assertTrue(source.toString().startsWith(SOURCE_NAME));
- source.shutdown();
-
- // try with null password
- source = new SingleThreadedDmaapTopicSource(makeBuilder().password(null).build());
- assertTrue(source.toString().startsWith(SOURCE_NAME));
- source.shutdown();
-
- // try with empty password
- source = new SingleThreadedDmaapTopicSource(makeBuilder().password("").build());
- assertTrue(source.toString().startsWith(SOURCE_NAME));
- source.shutdown();
- }
-
- @Test
- public void testInit() {
- // verify with different parameters
- new SingleThreadedDmaapTopicSource(makeBuilder().userName(null).build()).shutdown();
- new SingleThreadedDmaapTopicSource(makeBuilder().environment(null).aftEnvironment(null).latitude(null)
- .longitude(null).partner(null).build()).shutdown();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testSingleThreadedDmaapTopicSource_Ex() {
- new SingleThreadedDmaapTopicSource(makeBuilder().build()) {
- @Override
- public void init() throws MalformedURLException {
- throw new MalformedURLException(EXPECTED);
- }
- }.shutdown();
- }
-
- @Test
- public void testGetTopicCommInfrastructure() {
- assertEquals(CommInfrastructure.DMAAP, source.getTopicCommInfrastructure());
- }
-
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java
index 2ff353b8..6b63c9f4 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -30,8 +31,8 @@ import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
import org.onap.policy.common.utils.gson.GsonTestUtils;
-public class SingleThreadedUebTopicSourceTest extends TopicTestBase {
- private SingleThreadedUebTopicSource source;
+public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase {
+ private SingleThreadedKafkaTopicSource source;
/**
* Creates the object to be tested.
@@ -41,7 +42,7 @@ public class SingleThreadedUebTopicSourceTest extends TopicTestBase {
public void setUp() {
super.setUp();
- source = new SingleThreadedUebTopicSource(makeBuilder().build());
+ source = new SingleThreadedKafkaTopicSource(makeKafkaBuilder().build());
}
@After
@@ -49,20 +50,20 @@ public class SingleThreadedUebTopicSourceTest extends TopicTestBase {
source.shutdown();
}
- @Test
public void testSerialize() {
- new GsonTestUtils().compareGson(source, SingleThreadedUebTopicSourceTest.class);
+ assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedKafkaTopicSourceTest.class))
+ .doesNotThrowAnyException();
}
@Test
public void testToString() {
- assertTrue(source.toString().startsWith("SingleThreadedUebTopicSource ["));
+ assertTrue(source.toString().startsWith("SingleThreadedKafkaTopicSource ["));
source.shutdown();
}
@Test
public void testGetTopicCommInfrastructure() {
- assertEquals(CommInfrastructure.UEB, source.getTopicCommInfrastructure());
+ assertEquals(CommInfrastructure.KAFKA, source.getTopicCommInfrastructure());
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java
index 0cf1486f..0f09b12e 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java
@@ -20,6 +20,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -97,7 +98,7 @@ public class TopicBaseTest extends TopicTestBase {
@Test
public void testSerialize() {
- new GsonTestUtils().compareGson(base, TopicBaseTest.class);
+ assertThatCode(() -> new GsonTestUtils().compareGson(base, TopicBaseTest.class)).doesNotThrowAnyException();
}
@Test
@@ -193,9 +194,15 @@ public class TopicBaseTest extends TopicTestBase {
assertTrue(base.unlock());
assertEquals(1, base.startCount);
assertEquals(1, base.stopCount);
+ }
+
+ /**
+ * Tests lock/unlock when the stop/start methods return false.
+ */
+ @Test
+ public void testLock_testUnlock_FalseReturns() {
// lock, but stop returns false
- base = new TopicBaseImpl(servers, MY_TOPIC);
base.stopReturn = false;
assertFalse(base.lock());
assertTrue(base.isLocked());
@@ -206,9 +213,15 @@ public class TopicBaseTest extends TopicTestBase {
assertFalse(base.unlock());
assertFalse(base.isLocked());
assertTrue(base.unlock());
+ }
+
+ /**
+ * Tests lock/unlock when the start method throws an exception.
+ */
+ @Test
+ public void testLock_testUnlock_Exception() {
// lock & re-lock, but start throws an exception
- base = new TopicBaseImpl(servers, MY_TOPIC);
base.startEx = true;
assertTrue(base.lock());
assertFalse(base.unlock());
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
new file mode 100644
index 00000000..704b2cb0
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
@@ -0,0 +1,453 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.client;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BidirectionalTopicClientTest {
+ private static final Coder coder = new StandardCoder();
+ private static final long MAX_WAIT_MS = 5000;
+ private static final long SHORT_WAIT_MS = 1;
+ private static final String SINK_TOPIC = "my-sink-topic";
+ private static final String SOURCE_TOPIC = "my-source-topic";
+ private static final String MY_TEXT = "my-text";
+
+ private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
+ private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
+
+ @Mock
+ private TopicSink sink;
+ @Mock
+ private TopicSource source;
+ @Mock
+ private TopicEndpoint endpoint;
+ @Mock
+ private TopicListener listener;
+
+ private MyMessage theMessage;
+
+ private BidirectionalTopicClient client;
+ private Context context;
+
+ /**
+ * Configures the endpoints.
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ Properties props = new Properties();
+ props.setProperty("noop.sink.topics", SINK_TOPIC);
+ props.setProperty("noop.source.topics", SOURCE_TOPIC);
+
+ // clear all topics and then configure one sink and one source
+ TopicEndpointManager.getManager().shutdown();
+ TopicEndpointManager.getManager().addTopicSinks(props);
+ TopicEndpointManager.getManager().addTopicSources(props);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ // clear all topics after the tests
+ TopicEndpointManager.getManager().shutdown();
+ }
+
+ /**
+ * Creates mocks and an initial client object.
+ */
+ @Before
+ public void setUp() throws Exception {
+ when(sink.send(anyString())).thenReturn(true);
+ when(sink.getTopicCommInfrastructure()).thenReturn(SINK_INFRA);
+
+ when(source.offer(anyString())).thenReturn(true);
+ when(source.getTopicCommInfrastructure()).thenReturn(SOURCE_INFRA);
+
+ when(endpoint.getTopicSinks(anyString())).thenReturn(Arrays.asList());
+ when(endpoint.getTopicSinks(SINK_TOPIC)).thenReturn(Arrays.asList(sink));
+
+ when(endpoint.getTopicSources(any())).thenReturn(Arrays.asList());
+ when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source));
+
+ theMessage = new MyMessage(MY_TEXT);
+
+ client = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC);
+
+ context = new Context();
+ }
+
+ @After
+ public void tearDown() {
+ context.stop();
+ }
+
+ @Test
+ public void testBidirectionalTopicClient_testGetters() {
+ assertSame(sink, client.getSink());
+ assertSame(source, client.getSource());
+ assertEquals(SINK_TOPIC, client.getSinkTopic());
+ assertEquals(SOURCE_TOPIC, client.getSourceTopic());
+ assertEquals(SINK_INFRA, client.getSinkTopicCommInfrastructure());
+ assertEquals(SOURCE_INFRA, client.getSourceTopicCommInfrastructure());
+ }
+
+ /**
+ * Tests the constructor when the sink or source cannot be found.
+ */
+ @Test
+ public void testBidirectionalTopicClientExceptions() {
+ assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC))
+ .isInstanceOf(BidirectionalTopicClientException.class)
+ .hasMessage("no sinks for topic: unknown-sink");
+
+ assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source"))
+ .isInstanceOf(BidirectionalTopicClientException.class)
+ .hasMessage("no sources for topic: unknown-source");
+
+ // too many sources
+ when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source, source));
+
+ assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC))
+ .isInstanceOf(BidirectionalTopicClientException.class)
+ .hasMessage("too many sources for topic: my-source-topic");
+ }
+
+ /**
+ * Tests the "delegate" methods.
+ */
+ @Test
+ public void testDelegates() {
+ assertTrue(client.send("hello"));
+ verify(sink).send("hello");
+
+ assertTrue(client.offer("incoming"));
+ verify(source).offer("incoming");
+
+ client.register(listener);
+ verify(source).register(listener);
+
+ client.unregister(listener);
+ verify(source).unregister(listener);
+ }
+
+ @Test
+ public void testGetTopicEndpointManager() throws BidirectionalTopicClientException {
+ // use a real manager
+ client = new BidirectionalTopicClient(SINK_TOPIC, SOURCE_TOPIC);
+ assertNotNull(client.getTopicEndpointManager());
+
+ assertNotNull(client.getSink());
+ assertNotNull(client.getSource());
+
+ assertNotSame(sink, client.getSink());
+ assertNotSame(source, client.getSource());
+ }
+
+ @Test
+ public void testAwaitReceipt() throws Exception {
+ context.start(theMessage);
+ assertThat(context.awaitSend(1)).isTrue();
+
+ verify(source).register(any());
+ verify(sink, atLeast(1)).send(any());
+ assertThat(context.checker.isReady()).isFalse();
+
+ inject(theMessage);
+
+ verifyReceipt();
+ }
+
+ @Test
+ public void testAwaitReceipt_AlreadyDone() throws Exception {
+ context.start(theMessage);
+ assertThat(context.awaitSend(1)).isTrue();
+
+ inject(theMessage);
+
+ verifyReceipt();
+
+ // calling again should result in "true" again, without injecting message
+ context.start(theMessage);
+ verifyReceipt();
+ }
+
+ @Test
+ public void testAwaitReceipt_MessageDoesNotMatch() throws Exception {
+ context.start(theMessage);
+ assertThat(context.awaitSend(1)).isTrue();
+
+ // non-matching message
+ inject("{}");
+
+ // wait for a few more calls to "send" and then inject a matching message
+ assertThat(context.awaitSend(3)).isTrue();
+ inject(theMessage);
+
+ verifyReceipt();
+ }
+
+ @Test
+ public void testAwaitReceipt_DecodeFails() throws Exception {
+ context.start(theMessage);
+ assertThat(context.awaitSend(1)).isTrue();
+
+ // force a failure and inject the message
+ context.forceDecodeFailure = true;
+ inject(theMessage);
+
+ assertThat(context.awaitDecodeFailure()).isTrue();
+
+ // no more failures
+ context.forceDecodeFailure = false;
+ inject(theMessage);
+
+ verifyReceipt();
+ }
+
+ @Test
+ public void testAwaitReceipt_Interrupted() throws InterruptedException {
+ context.start(theMessage);
+ assertThat(context.awaitSend(1)).isTrue();
+
+ context.interrupt();
+
+ verifyNoReceipt();
+ }
+
+ @Test
+ public void testAwaitReceipt_MultipleLoops() throws Exception {
+ context.start(theMessage);
+
+ // wait for multiple "send" calls
+ assertThat(context.awaitSend(3)).isTrue();
+
+ inject(theMessage);
+
+ verifyReceipt();
+ }
+
+ @Test
+ public void testStop() throws InterruptedException {
+ context.start(theMessage);
+ assertThat(context.awaitSend(1)).isTrue();
+
+ context.stop();
+
+ verifyNoReceipt();
+ }
+
+ /**
+ * Verifies that awaitReceipt() returns {@code true}.
+ *
+ * @throws InterruptedException if interrupted while waiting for the thread to
+ * terminate
+ */
+ private void verifyReceipt() throws InterruptedException {
+ assertThat(context.join()).isTrue();
+ assertThat(context.result).isTrue();
+ assertThat(context.exception).isNull();
+ assertThat(context.checker.isReady()).isTrue();
+
+ verify(source).unregister(any());
+ }
+
+ /**
+ * Verifies that awaitReceipt() returns {@code false}.
+ *
+ * @throws InterruptedException if interrupted while waiting for the thread to
+ * terminate
+ */
+ private void verifyNoReceipt() throws InterruptedException {
+ assertThat(context.join()).isTrue();
+ assertThat(context.result).isFalse();
+ assertThat(context.exception).isNull();
+ assertThat(context.checker.isReady()).isFalse();
+
+ verify(source).unregister(any());
+ }
+
+ /**
+ * Injects a message into the source topic.
+ *
+ * @param message message to be injected
+ * @throws CoderException if the message cannot be encoded
+ */
+ private void inject(MyMessage message) throws CoderException {
+ inject(coder.encode(message));
+ }
+
+ /**
+ * Injects a message into the source topic.
+ *
+ * @param message message to be injected
+ */
+ private void inject(String message) {
+ ArgumentCaptor<TopicListener> cap = ArgumentCaptor.forClass(TopicListener.class);
+ verify(source).register(cap.capture());
+
+ cap.getValue().onTopicEvent(SOURCE_INFRA, SOURCE_TOPIC, message);
+ }
+
+
+ /**
+ * BidirectionalTopicClient with some overrides.
+ */
+ private class BidirectionalTopicClient2 extends BidirectionalTopicClient {
+
+ public BidirectionalTopicClient2(String sinkTopic, String sourceTopic)
+ throws BidirectionalTopicClientException {
+ super(sinkTopic, sourceTopic);
+ }
+
+ @Override
+ protected TopicEndpoint getTopicEndpointManager() {
+ return endpoint;
+ }
+ }
+
+ private class Context {
+ private Thread thread;
+ private boolean result;
+ private Exception exception;
+ private boolean forceDecodeFailure;
+
+ // released every time the checker publishes a message
+ private final Semaphore sendSem = new Semaphore(0);
+
+ // released every time a message-decode fails
+ private final Semaphore decodeFailedSem = new Semaphore(0);
+
+ private final BidirectionalTopicClient2 checker;
+
+ public Context() throws BidirectionalTopicClientException {
+
+ checker = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC) {
+
+ @Override
+ public boolean send(String messageText) {
+ boolean result = super.send(messageText);
+ sendSem.release();
+ return result;
+ }
+
+ @Override
+ protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
+ if (forceDecodeFailure) {
+ throw new CoderException("expected exception");
+ }
+
+ return super.decode(msg, clazz);
+ }
+
+ @Override
+ protected void decodeFailed() {
+ super.decodeFailed();
+ decodeFailedSem.release();
+ }
+ };
+ }
+
+ /**
+ * Starts the thread.
+ *
+ * @param message message to be sent to the sink topic
+ */
+ public void start(MyMessage message) {
+ thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ result = checker.awaitReady(message, SHORT_WAIT_MS);
+ } catch (Exception e) {
+ exception = e;
+ }
+ }
+ };
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public void stop() {
+ checker.stopWaiting();
+ }
+
+ public boolean join() throws InterruptedException {
+ thread.join(MAX_WAIT_MS);
+ return !thread.isAlive();
+ }
+
+ public void interrupt() {
+ thread.interrupt();
+ }
+
+ public boolean awaitSend(int npermits) throws InterruptedException {
+ return sendSem.tryAcquire(npermits, MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+ }
+
+ public boolean awaitDecodeFailure() throws InterruptedException {
+ return decodeFailedSem.tryAcquire(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class MyMessage {
+ private String text;
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientExceptionTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicClientExceptionTest.java
index c0814703..7b64a20f 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientExceptionTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicClientExceptionTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP PAP
* ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -26,10 +26,11 @@ import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.onap.policy.common.utils.test.ExceptionsTester;
-public class TopicSinkClientExceptionTest {
+public class TopicClientExceptionTest {
@Test
public void test() {
assertEquals(5, new ExceptionsTester().test(TopicSinkClientException.class));
+ assertEquals(5, new ExceptionsTester().test(BidirectionalTopicClientException.class));
}
}