diff options
author | Sirisha_Manchikanti <sirisha.manchikanti@est.tech> | 2022-08-25 15:00:58 +0100 |
---|---|---|
committer | Sirisha_Manchikanti <sirisha.manchikanti@est.tech> | 2022-09-21 10:54:56 +0100 |
commit | 775b9f1e14a246b2df0a65bd63c0775120659f35 (patch) | |
tree | 7f768cbbf9859fe420f6b4e923105425837af85b /policy-endpoints/src/test/java/org/onap | |
parent | b433dd58dc50b5c59f84ea18908b5e1a0f25f78a (diff) |
Publish and Subscribe to Kafka topic
Issue-ID: POLICY-4134
Signed-off-by: Sirisha_Manchikanti <sirisha.manchikanti@est.tech>
Change-Id: Idefa5b6f3cb702a4b478b76570717e73214d235a
Diffstat (limited to 'policy-endpoints/src/test/java/org/onap')
7 files changed, 372 insertions, 12 deletions
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java index 1a815e1a..a00879c1 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java @@ -32,7 +32,7 @@ import org.onap.policy.common.endpoints.parameters.TopicParameters; public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder { - public static final String SERVER = "my-server"; + public static final String SERVER = "localhost:9092"; public static final String TOPIC2 = "my-topic-2"; @Getter diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java new file mode 100644 index 00000000..c109e70a --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java @@ -0,0 +1,192 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX; + +import java.util.Arrays; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; + +public class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSink> { + + private SinkFactory factory; + public static final String KAFKA_SERVER = "localhost:9092"; + + /** + * Creates the object to be tested. + */ + @Before + @Override + public void setUp() { + super.setUp(); + + factory = new SinkFactory(); + } + + @After + public void tearDown() { + factory.destroy(); + } + + @Test + @Override + public void testBuildBusTopicParams() { + super.testBuildBusTopicParams(); + super.testBuildBusTopicParams_Ex(); + } + + @Test + @Override + public void testBuildListOfStringString() { + super.testBuildListOfStringString(); + + // check parameters that were used + BusTopicParams params = getLastParams(); + assertEquals(false, params.isAllowSelfSignedCerts()); + } + + @Test + @Override + public void testBuildProperties() { + List<KafkaTopicSink> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()); + assertEquals(1, topics.size()); + assertEquals(MY_TOPIC, topics.get(0).getTopic()); + assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic()); + + BusTopicParams params = getLastParams(); + assertEquals(true, params.isManaged()); + assertEquals(false, params.isUseHttps()); + assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers()); + assertEquals(MY_TOPIC, params.getTopic()); + assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic()); + assertEquals(MY_PARTITION, params.getPartitionId()); + + List<KafkaTopicSink> topics2 = buildTopics(makePropBuilder().makeTopic(TOPIC3) + .removeTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX).build()); + assertEquals(1, topics2.size()); + assertEquals(TOPIC3, topics2.get(0).getTopic()); + assertEquals(topics2.get(0).getTopic(), topics2.get(0).getEffectiveTopic()); + + initFactory(); + + assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size()); + } + + @Test + @Override + public void testDestroyString_testGet_testInventory() { + super.testDestroyString_testGet_testInventory(); + super.testDestroyString_Ex(); + } + + @Test + @Override + public void testDestroy() { + super.testDestroy(); + } + + @Test + public void testGet() { + super.testGet_Ex(); + } + + @Test + public void testToString() { + assertTrue(factory.toString().startsWith("IndexedKafkaTopicSinkFactory [")); + } + + @Override + protected void initFactory() { + if (factory != null) { + factory.destroy(); + } + + factory = new SinkFactory(); + } + + @Override + protected List<KafkaTopicSink> buildTopics(Properties properties) { + return factory.build(properties); + } + + @Override + protected KafkaTopicSink buildTopic(BusTopicParams params) { + return factory.build(params); + } + + @Override + protected KafkaTopicSink buildTopic(List<String> servers, String topic) { + return factory.build(servers, topic); + } + + @Override + protected void destroyFactory() { + factory.destroy(); + } + + @Override + protected void destroyTopic(String topic) { + factory.destroy(topic); + } + + @Override + protected List<KafkaTopicSink> getInventory() { + return factory.inventory(); + } + + @Override + protected KafkaTopicSink getTopic(String topic) { + return factory.get(topic); + } + + @Override + protected BusTopicParams getLastParams() { + return factory.params.getLast(); + } + + @Override + protected TopicPropertyBuilder makePropBuilder() { + return new KafkaTopicPropertyBuilder(PROPERTY_KAFKA_SINK_TOPICS); + } + + /** + * Factory that records the parameters of all of the sinks it creates. + */ + private static class SinkFactory extends IndexedKafkaTopicSinkFactory { + private Deque<BusTopicParams> params = new LinkedList<>(); + + @Override + protected KafkaTopicSink makeSink(BusTopicParams busTopicParams) { + params.add(busTopicParams); + return super.makeSink(busTopicParams); + } + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java index 6fa80a41..3a62b4a7 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java @@ -24,7 +24,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX; +import java.util.Arrays; import java.util.Deque; import java.util.LinkedList; import java.util.List; @@ -40,6 +42,8 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka private SourceFactory factory; + public static final String KAFKA_SERVER = "localhost:9092"; + /** * Creates the object to be tested. */ @@ -58,12 +62,6 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka @Test @Override - public void testBuildBusTopicParams() { - super.testBuildBusTopicParams_Ex(); - } - - @Test - @Override public void testBuildProperties() { initFactory(); @@ -71,15 +69,30 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka List<KafkaTopicSource> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()); assertEquals(1, topics.size()); assertEquals(MY_TOPIC, topics.get(0).getTopic()); + assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic()); + + BusTopicParams params = getLastParams(); + assertEquals(true, params.isManaged()); + assertEquals(false, params.isUseHttps()); + assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers()); + assertEquals(MY_TOPIC, params.getTopic()); + assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic()); } @Test @Override public void testDestroyString_testGet_testInventory() { + super.testDestroyString_testGet_testInventory(); super.testDestroyString_Ex(); } @Test + @Override + public void testDestroy() { + super.testDestroy(); + } + + @Test public void testGet() { super.testGet_Ex(); } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java index 8b75fa35..bd88eec9 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java @@ -59,6 +59,8 @@ public class TopicTestBase { public static final String ROUTE_PROP = "routeOffer"; public static final String MY_ROUTE = "my-route"; + public static final String MY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + public static final int KAFKA_PORT = 9092; /** * Message used within exceptions that are expected. @@ -76,6 +78,11 @@ public class TopicTestBase { protected List<String> servers; /** + * Servers to be added to the parameter builder. + */ + protected List<String> kafkaServers; + + /** * Parameter builder used to build topic parameters. */ protected TopicParamsBuilder builder; @@ -89,13 +96,14 @@ public class TopicTestBase { addProps.put("my-key-B", "my-value-B"); servers = Arrays.asList("svra", "svrb"); + kafkaServers = Arrays.asList("localhost:9092", "10.1.2.3:9092"); builder = makeBuilder(); } /** * Makes a fully populated parameter builder. - * + * * @return a new parameter builder */ public TopicParamsBuilder makeBuilder() { @@ -117,6 +125,39 @@ public class TopicTestBase { .fetchLimit(MY_FETCH_LIMIT).fetchTimeout(MY_FETCH_TIMEOUT).hostname(MY_HOST).latitude(MY_LAT) .longitude(MY_LONG).managed(true).partitionId(MY_PARTITION).partner(MY_PARTNER) .password(MY_PASS).port(MY_PORT).servers(servers).topic(MY_TOPIC) - .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME); + .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME) + .serializationProvider(MY_SERIALIZER); + } + + /** + * Makes a fully populated parameter builder. + * + * @return a new parameter builder + */ + public TopicParamsBuilder makeKafkaBuilder() { + addProps.clear(); + String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule " + + "required username=abc password=abc serviceName=kafka;"; + addProps.put("sasl.jaas.config", jaas); + addProps.put("sasl.mechanism", "SCRAM-SHA-512"); + addProps.put("security.protocol", "SASL_PLAINTEXT"); + + return makeKafkaBuilder(addProps, kafkaServers); + } + + /** + * Makes a fully populated parameter builder. + * + * @param addProps additional properties to be added to the builder + * @param servers servers to be added to the builder + * @return a new parameter builder + */ + public TopicParamsBuilder makeKafkaBuilder(Map<String, String> addProps, List<String> servers) { + + return BusTopicParams.builder().additionalProps(addProps).basePath(MY_BASE_PATH).clientName(MY_CLIENT_NAME) + .consumerGroup(MY_CONS_GROUP).consumerInstance(MY_CONS_INST).environment(MY_ENV) + .hostname(MY_HOST).partitionId(MY_PARTITION).partner(MY_PARTNER) + .port(KAFKA_PORT).servers(servers).topic(MY_TOPIC) + .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(false); } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java index da9f792b..7df5d129 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java @@ -34,8 +34,11 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.commons.collections4.IteratorUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.junit.Before; import org.junit.Test; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; @@ -299,12 +302,46 @@ public class BusConsumerTest extends TopicTestBase { @Test public void testKafkaConsumerWrapper() throws Exception { // verify that different wrappers can be built - assertThatCode(() -> new KafkaConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException(); + assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException(); + } + + @Test(expected = IllegalArgumentException.class) + public void testKafkaConsumerWrapper_InvalidTopic() throws Exception { + new KafkaConsumerWrapper(makeBuilder().topic(null).build()); + } + + @Test(expected = java.lang.IllegalStateException.class) + public void testKafkaConsumerWrapperFetch() throws Exception { + + //Setup Properties for consumer + Properties kafkaProps = new Properties(); + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test"); + kafkaProps.setProperty("enable.auto.commit", "true"); + kafkaProps.setProperty("auto.commit.interval.ms", "1000"); + kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps); + kafka.consumer = consumer; + + assertFalse(kafka.fetch().iterator().hasNext()); + consumer.close(); + } + + @Test + public void testKafkaConsumerWrapperClose() throws Exception { + assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException(); } @Test public void testKafkaConsumerWrapperToString() throws Exception { - assertNotNull(new KafkaConsumerWrapper(makeBuilder().build()) {}.toString()); + assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString()); } private static class FetchingBusConsumerImpl extends FetchingBusConsumer { diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java new file mode 100644 index 00000000..b40b9541 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java @@ -0,0 +1,71 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; +import org.onap.policy.common.utils.gson.GsonTestUtils; + +public class InlineKafkaTopicSinkTest extends TopicTestBase { + private InlineKafkaTopicSink sink; + + /** + * Creates the object to be tested. + */ + @Before + @Override + public void setUp() { + super.setUp(); + + sink = new InlineKafkaTopicSink(makeKafkaBuilder().build()); + } + + @After + public void tearDown() { + sink.shutdown(); + } + + @Test + public void testToString() { + assertTrue(sink.toString().startsWith("InlineKafkaTopicSink [")); + } + + @Test + public void testInit() { + // nothing null + sink = new InlineKafkaTopicSink(makeKafkaBuilder().build()); + sink.init(); + assertThatCode(() -> sink.shutdown()).doesNotThrowAnyException(); + } + + @Test + public void testGetTopicCommInfrastructure() { + assertEquals(CommInfrastructure.KAFKA, sink.getTopicCommInfrastructure()); + } + +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java index cc096585..6b63c9f4 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java @@ -42,7 +42,7 @@ public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase { public void setUp() { super.setUp(); - source = new SingleThreadedKafkaTopicSource(makeBuilder().build()); + source = new SingleThreadedKafkaTopicSource(makeKafkaBuilder().build()); } @After @@ -50,9 +50,15 @@ public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase { source.shutdown(); } + public void testSerialize() { + assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedKafkaTopicSourceTest.class)) + .doesNotThrowAnyException(); + } + @Test public void testToString() { assertTrue(source.toString().startsWith("SingleThreadedKafkaTopicSource [")); + source.shutdown(); } @Test |