summaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/test/java/org/onap
diff options
context:
space:
mode:
authorSirisha_Manchikanti <sirisha.manchikanti@est.tech>2022-08-25 15:00:58 +0100
committerSirisha_Manchikanti <sirisha.manchikanti@est.tech>2022-09-21 10:54:56 +0100
commit775b9f1e14a246b2df0a65bd63c0775120659f35 (patch)
tree7f768cbbf9859fe420f6b4e923105425837af85b /policy-endpoints/src/test/java/org/onap
parentb433dd58dc50b5c59f84ea18908b5e1a0f25f78a (diff)
Publish and Subscribe to Kafka topic
Issue-ID: POLICY-4134 Signed-off-by: Sirisha_Manchikanti <sirisha.manchikanti@est.tech> Change-Id: Idefa5b6f3cb702a4b478b76570717e73214d235a
Diffstat (limited to 'policy-endpoints/src/test/java/org/onap')
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java2
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java192
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java25
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java45
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java41
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java71
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java8
7 files changed, 372 insertions, 12 deletions
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
index 1a815e1a..a00879c1 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
@@ -32,7 +32,7 @@ import org.onap.policy.common.endpoints.parameters.TopicParameters;
public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
- public static final String SERVER = "my-server";
+ public static final String SERVER = "localhost:9092";
public static final String TOPIC2 = "my-topic-2";
@Getter
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
new file mode 100644
index 00000000..c109e70a
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
@@ -0,0 +1,192 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
+
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+
+public class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSink> {
+
+ private SinkFactory factory;
+ public static final String KAFKA_SERVER = "localhost:9092";
+
+ /**
+ * Creates the object to be tested.
+ */
+ @Before
+ @Override
+ public void setUp() {
+ super.setUp();
+
+ factory = new SinkFactory();
+ }
+
+ @After
+ public void tearDown() {
+ factory.destroy();
+ }
+
+ @Test
+ @Override
+ public void testBuildBusTopicParams() {
+ super.testBuildBusTopicParams();
+ super.testBuildBusTopicParams_Ex();
+ }
+
+ @Test
+ @Override
+ public void testBuildListOfStringString() {
+ super.testBuildListOfStringString();
+
+ // check parameters that were used
+ BusTopicParams params = getLastParams();
+ assertEquals(false, params.isAllowSelfSignedCerts());
+ }
+
+ @Test
+ @Override
+ public void testBuildProperties() {
+ List<KafkaTopicSink> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build());
+ assertEquals(1, topics.size());
+ assertEquals(MY_TOPIC, topics.get(0).getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic());
+
+ BusTopicParams params = getLastParams();
+ assertEquals(true, params.isManaged());
+ assertEquals(false, params.isUseHttps());
+ assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers());
+ assertEquals(MY_TOPIC, params.getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
+ assertEquals(MY_PARTITION, params.getPartitionId());
+
+ List<KafkaTopicSink> topics2 = buildTopics(makePropBuilder().makeTopic(TOPIC3)
+ .removeTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX).build());
+ assertEquals(1, topics2.size());
+ assertEquals(TOPIC3, topics2.get(0).getTopic());
+ assertEquals(topics2.get(0).getTopic(), topics2.get(0).getEffectiveTopic());
+
+ initFactory();
+
+ assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size());
+ }
+
+ @Test
+ @Override
+ public void testDestroyString_testGet_testInventory() {
+ super.testDestroyString_testGet_testInventory();
+ super.testDestroyString_Ex();
+ }
+
+ @Test
+ @Override
+ public void testDestroy() {
+ super.testDestroy();
+ }
+
+ @Test
+ public void testGet() {
+ super.testGet_Ex();
+ }
+
+ @Test
+ public void testToString() {
+ assertTrue(factory.toString().startsWith("IndexedKafkaTopicSinkFactory ["));
+ }
+
+ @Override
+ protected void initFactory() {
+ if (factory != null) {
+ factory.destroy();
+ }
+
+ factory = new SinkFactory();
+ }
+
+ @Override
+ protected List<KafkaTopicSink> buildTopics(Properties properties) {
+ return factory.build(properties);
+ }
+
+ @Override
+ protected KafkaTopicSink buildTopic(BusTopicParams params) {
+ return factory.build(params);
+ }
+
+ @Override
+ protected KafkaTopicSink buildTopic(List<String> servers, String topic) {
+ return factory.build(servers, topic);
+ }
+
+ @Override
+ protected void destroyFactory() {
+ factory.destroy();
+ }
+
+ @Override
+ protected void destroyTopic(String topic) {
+ factory.destroy(topic);
+ }
+
+ @Override
+ protected List<KafkaTopicSink> getInventory() {
+ return factory.inventory();
+ }
+
+ @Override
+ protected KafkaTopicSink getTopic(String topic) {
+ return factory.get(topic);
+ }
+
+ @Override
+ protected BusTopicParams getLastParams() {
+ return factory.params.getLast();
+ }
+
+ @Override
+ protected TopicPropertyBuilder makePropBuilder() {
+ return new KafkaTopicPropertyBuilder(PROPERTY_KAFKA_SINK_TOPICS);
+ }
+
+ /**
+ * Factory that records the parameters of all of the sinks it creates.
+ */
+ private static class SinkFactory extends IndexedKafkaTopicSinkFactory {
+ private Deque<BusTopicParams> params = new LinkedList<>();
+
+ @Override
+ protected KafkaTopicSink makeSink(BusTopicParams busTopicParams) {
+ params.add(busTopicParams);
+ return super.makeSink(busTopicParams);
+ }
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java
index 6fa80a41..3a62b4a7 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java
@@ -24,7 +24,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
+import java.util.Arrays;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
@@ -40,6 +42,8 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka
private SourceFactory factory;
+ public static final String KAFKA_SERVER = "localhost:9092";
+
/**
* Creates the object to be tested.
*/
@@ -58,12 +62,6 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka
@Test
@Override
- public void testBuildBusTopicParams() {
- super.testBuildBusTopicParams_Ex();
- }
-
- @Test
- @Override
public void testBuildProperties() {
initFactory();
@@ -71,15 +69,30 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka
List<KafkaTopicSource> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build());
assertEquals(1, topics.size());
assertEquals(MY_TOPIC, topics.get(0).getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic());
+
+ BusTopicParams params = getLastParams();
+ assertEquals(true, params.isManaged());
+ assertEquals(false, params.isUseHttps());
+ assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers());
+ assertEquals(MY_TOPIC, params.getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
}
@Test
@Override
public void testDestroyString_testGet_testInventory() {
+ super.testDestroyString_testGet_testInventory();
super.testDestroyString_Ex();
}
@Test
+ @Override
+ public void testDestroy() {
+ super.testDestroy();
+ }
+
+ @Test
public void testGet() {
super.testGet_Ex();
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java
index 8b75fa35..bd88eec9 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java
@@ -59,6 +59,8 @@ public class TopicTestBase {
public static final String ROUTE_PROP = "routeOffer";
public static final String MY_ROUTE = "my-route";
+ public static final String MY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+ public static final int KAFKA_PORT = 9092;
/**
* Message used within exceptions that are expected.
@@ -76,6 +78,11 @@ public class TopicTestBase {
protected List<String> servers;
/**
+ * Servers to be added to the parameter builder.
+ */
+ protected List<String> kafkaServers;
+
+ /**
* Parameter builder used to build topic parameters.
*/
protected TopicParamsBuilder builder;
@@ -89,13 +96,14 @@ public class TopicTestBase {
addProps.put("my-key-B", "my-value-B");
servers = Arrays.asList("svra", "svrb");
+ kafkaServers = Arrays.asList("localhost:9092", "10.1.2.3:9092");
builder = makeBuilder();
}
/**
* Makes a fully populated parameter builder.
- *
+ *
* @return a new parameter builder
*/
public TopicParamsBuilder makeBuilder() {
@@ -117,6 +125,39 @@ public class TopicTestBase {
.fetchLimit(MY_FETCH_LIMIT).fetchTimeout(MY_FETCH_TIMEOUT).hostname(MY_HOST).latitude(MY_LAT)
.longitude(MY_LONG).managed(true).partitionId(MY_PARTITION).partner(MY_PARTNER)
.password(MY_PASS).port(MY_PORT).servers(servers).topic(MY_TOPIC)
- .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME);
+ .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME)
+ .serializationProvider(MY_SERIALIZER);
+ }
+
+ /**
+ * Makes a fully populated parameter builder.
+ *
+ * @return a new parameter builder
+ */
+ public TopicParamsBuilder makeKafkaBuilder() {
+ addProps.clear();
+ String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule "
+ + "required username=abc password=abc serviceName=kafka;";
+ addProps.put("sasl.jaas.config", jaas);
+ addProps.put("sasl.mechanism", "SCRAM-SHA-512");
+ addProps.put("security.protocol", "SASL_PLAINTEXT");
+
+ return makeKafkaBuilder(addProps, kafkaServers);
+ }
+
+ /**
+ * Makes a fully populated parameter builder.
+ *
+ * @param addProps additional properties to be added to the builder
+ * @param servers servers to be added to the builder
+ * @return a new parameter builder
+ */
+ public TopicParamsBuilder makeKafkaBuilder(Map<String, String> addProps, List<String> servers) {
+
+ return BusTopicParams.builder().additionalProps(addProps).basePath(MY_BASE_PATH).clientName(MY_CLIENT_NAME)
+ .consumerGroup(MY_CONS_GROUP).consumerInstance(MY_CONS_INST).environment(MY_ENV)
+ .hostname(MY_HOST).partitionId(MY_PARTITION).partner(MY_PARTNER)
+ .port(KAFKA_PORT).servers(servers).topic(MY_TOPIC)
+ .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(false);
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
index da9f792b..7df5d129 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
@@ -34,8 +34,11 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.collections4.IteratorUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Before;
import org.junit.Test;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
@@ -299,12 +302,46 @@ public class BusConsumerTest extends TopicTestBase {
@Test
public void testKafkaConsumerWrapper() throws Exception {
// verify that different wrappers can be built
- assertThatCode(() -> new KafkaConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
+ assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testKafkaConsumerWrapper_InvalidTopic() throws Exception {
+ new KafkaConsumerWrapper(makeBuilder().topic(null).build());
+ }
+
+ @Test(expected = java.lang.IllegalStateException.class)
+ public void testKafkaConsumerWrapperFetch() throws Exception {
+
+ //Setup Properties for consumer
+ Properties kafkaProps = new Properties();
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
+ kafkaProps.setProperty("enable.auto.commit", "true");
+ kafkaProps.setProperty("auto.commit.interval.ms", "1000");
+ kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
+ kafka.consumer = consumer;
+
+ assertFalse(kafka.fetch().iterator().hasNext());
+ consumer.close();
+ }
+
+ @Test
+ public void testKafkaConsumerWrapperClose() throws Exception {
+ assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
}
@Test
public void testKafkaConsumerWrapperToString() throws Exception {
- assertNotNull(new KafkaConsumerWrapper(makeBuilder().build()) {}.toString());
+ assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
}
private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java
new file mode 100644
index 00000000..b40b9541
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java
@@ -0,0 +1,71 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
+import org.onap.policy.common.utils.gson.GsonTestUtils;
+
+public class InlineKafkaTopicSinkTest extends TopicTestBase {
+ private InlineKafkaTopicSink sink;
+
+ /**
+ * Creates the object to be tested.
+ */
+ @Before
+ @Override
+ public void setUp() {
+ super.setUp();
+
+ sink = new InlineKafkaTopicSink(makeKafkaBuilder().build());
+ }
+
+ @After
+ public void tearDown() {
+ sink.shutdown();
+ }
+
+ @Test
+ public void testToString() {
+ assertTrue(sink.toString().startsWith("InlineKafkaTopicSink ["));
+ }
+
+ @Test
+ public void testInit() {
+ // nothing null
+ sink = new InlineKafkaTopicSink(makeKafkaBuilder().build());
+ sink.init();
+ assertThatCode(() -> sink.shutdown()).doesNotThrowAnyException();
+ }
+
+ @Test
+ public void testGetTopicCommInfrastructure() {
+ assertEquals(CommInfrastructure.KAFKA, sink.getTopicCommInfrastructure());
+ }
+
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java
index cc096585..6b63c9f4 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java
@@ -42,7 +42,7 @@ public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase {
public void setUp() {
super.setUp();
- source = new SingleThreadedKafkaTopicSource(makeBuilder().build());
+ source = new SingleThreadedKafkaTopicSource(makeKafkaBuilder().build());
}
@After
@@ -50,9 +50,15 @@ public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase {
source.shutdown();
}
+ public void testSerialize() {
+ assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedKafkaTopicSourceTest.class))
+ .doesNotThrowAnyException();
+ }
+
@Test
public void testToString() {
assertTrue(source.toString().startsWith("SingleThreadedKafkaTopicSource ["));
+ source.shutdown();
}
@Test