From 775b9f1e14a246b2df0a65bd63c0775120659f35 Mon Sep 17 00:00:00 2001 From: Sirisha_Manchikanti Date: Thu, 25 Aug 2022 15:00:58 +0100 Subject: Publish and Subscribe to Kafka topic Issue-ID: POLICY-4134 Signed-off-by: Sirisha_Manchikanti Change-Id: Idefa5b6f3cb702a4b478b76570717e73214d235a --- .../comm/bus/IndexedKafkaTopicSourceFactory.java | 1 + .../event/comm/bus/internal/BusConsumer.java | 62 ++++++- .../event/comm/bus/internal/BusPublisher.java | 59 +++++-- .../comm/bus/internal/InlineKafkaTopicSink.java | 5 + .../internal/SingleThreadedKafkaTopicSource.java | 24 ++- .../properties/PolicyEndPointProperties.java | 1 - .../common/endpoints/utils/KafkaPropertyUtils.java | 6 +- .../event/comm/bus/KafkaTopicPropertyBuilder.java | 2 +- .../event/comm/bus/KafkaTopicSinkFactoryTest.java | 192 +++++++++++++++++++++ .../comm/bus/KafkaTopicSourceFactoryTest.java | 25 ++- .../endpoints/event/comm/bus/TopicTestBase.java | 45 ++++- .../event/comm/bus/internal/BusConsumerTest.java | 41 ++++- .../bus/internal/InlineKafkaTopicSinkTest.java | 71 ++++++++ .../SingleThreadedKafkaTopicSourceTest.java | 8 +- .../bus/internal/InlineKafkaTopicSinkTest.json | 26 ++- .../SingleThreadedKafkaTopicSourceTest.json | 24 ++- 16 files changed, 540 insertions(+), 52 deletions(-) create mode 100644 policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java create mode 100644 policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java index 47279d47..45a8be3f 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java @@ -65,6 +65,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory { } var kafkaTopicSource = makeSource(busTopicParams); + kafkaTopicSources.put(busTopicParams.getTopic(), kafkaTopicSource); return kafkaTopicSource; diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 8d88b0d9..ee41150f 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -32,6 +32,8 @@ import java.security.GeneralSecurityException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -42,6 +44,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder; @@ -235,10 +239,13 @@ public interface BusConsumer { */ private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class); + private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + /** * Kafka consumer. */ - private KafkaConsumer consumer; + protected KafkaConsumer consumer; + protected Properties kafkaProps; /** * Kafka Consumer Wrapper. @@ -250,20 +257,67 @@ public interface BusConsumer { * @throws GeneralSecurityException - Security exception * @throws MalformedURLException - Malformed URL exception */ - public KafkaConsumerWrapper(BusTopicParams busTopicParams) { + public KafkaConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { super(busTopicParams); + + if (busTopicParams.isTopicInvalid()) { + throw new IllegalArgumentException("No topic for Kafka"); + } + + //Setup Properties for consumer + kafkaProps = new Properties(); + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + busTopicParams.getServers().get(0)); + + if (busTopicParams.isAdditionalPropsValid()) { + for (Map.Entry entry : busTopicParams.getAdditionalProps().entrySet()) { + kafkaProps.put(entry.getKey(), entry.getValue()); + } + } + + if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); + } + if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); + } + if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) { + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup()); + } + consumer = new KafkaConsumer<>(kafkaProps); + //Subscribe to the topic + consumer.subscribe(Arrays.asList(busTopicParams.getTopic())); } @Override public Iterable fetch() throws IOException { - // TODO: Not implemented yet - return new ArrayList<>(); + ConsumerRecords records = this.consumer.poll(Duration.ofMillis(fetchTimeout)); + if (records == null || records.count() <= 0) { + return Collections.emptyList(); + } + List messages = new ArrayList<>(records.count()); + try { + for (TopicPartition partition : records.partitions()) { + List> partitionRecords = records.records(partition); + for (ConsumerRecord record : partitionRecords) { + messages.add(record.value()); + } + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + } + } catch (Exception e) { + logger.error("{}: cannot fetch because of {}", this, e.getMessage()); + sleepAfterFetchFailure(); + throw e; + } + return messages; } @Override public void close() { super.close(); this.consumer.close(); + logger.info("Kafka Consumer exited {}", this); } @Override diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index e0df7095..fe9bab23 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -32,13 +32,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; +import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.record.CompressionType; import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; import org.onap.dmaap.mr.client.response.MRPublisherResponse; import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; @@ -155,22 +156,45 @@ public interface BusPublisher { public static class KafkaPublisherWrapper implements BusPublisher { private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class); + private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + + private String topic; /** - * The actual Kafka publisher. + * Kafka publisher. */ - private final KafkaProducer producer; + private Producer producer; + protected Properties kafkaProps; /** - * Constructor. + * Kafka Publisher Wrapper. * * @param busTopicParams topic parameters */ - public KafkaPublisherWrapper(BusTopicParams busTopicParams) { - // TODO Setting of topic parameters is not implemented yet. - //Setup Properties for Kafka Producer - Properties kafkaProps = new Properties(); - this.producer = new KafkaProducer(kafkaProps); + protected KafkaPublisherWrapper(BusTopicParams busTopicParams) { + + if (busTopicParams.isTopicInvalid()) { + throw new IllegalArgumentException("No topic for Kafka"); + } + + this.topic = busTopicParams.getTopic(); + + //Setup Properties for consumer + kafkaProps = new Properties(); + kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0)); + if (busTopicParams.isAdditionalPropsValid()) { + for (Map.Entry entry : busTopicParams.getAdditionalProps().entrySet()) { + kafkaProps.put(entry.getKey(), entry.getValue()); + } + } + if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + } + if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) { + kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); + } + + producer = new KafkaProducer<>(kafkaProps); } @Override @@ -178,7 +202,18 @@ public interface BusPublisher { if (message == null) { throw new IllegalArgumentException("No message provided"); } - // TODO Sending messages is not implemented yet + + try { + //Create the record + ProducerRecord record = new ProducerRecord(topic, + UUID.randomUUID().toString(), message); + + this.producer.send(record); + producer.flush(); + } catch (Exception e) { + logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e); + return false; + } return true; } @@ -186,7 +221,7 @@ public interface BusPublisher { public void close() { logger.info("{}: CLOSE", this); - try (this.producer) { + try { this.producer.close(); } catch (Exception e) { logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java index b564229b..6574d408 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java @@ -18,6 +18,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import java.util.Map; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink; import org.slf4j.Logger; @@ -34,6 +35,8 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop */ private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class); + protected Map additionalProps = null; + /** * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned * attributes. @@ -47,6 +50,7 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop */ public InlineKafkaTopicSink(BusTopicParams busTopicParams) { super(busTopicParams); + this.additionalProps = busTopicParams.getAdditionalProps(); } /** @@ -59,6 +63,7 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop .servers(this.servers) .topic(this.effectiveTopic) .useHttps(this.useHttps) + .additionalProps(this.additionalProps) .build()); logger.info("{}: KAFKA SINK created", this); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java index b8362b83..2a651ee7 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java @@ -18,6 +18,8 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import java.net.MalformedURLException; +import java.util.Map; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; @@ -27,6 +29,8 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource; */ public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource { + protected Map additionalProps = null; + /** * Constructor. * @@ -35,19 +39,29 @@ public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource */ public SingleThreadedKafkaTopicSource(BusTopicParams busTopicParams) { super(busTopicParams); - this.init(); + this.additionalProps = busTopicParams.getAdditionalProps(); + try { + this.init(); + } catch (Exception e) { + throw new IllegalArgumentException("ERROR during init in kafka-source: cannot create topic " + topic, e); + } } /** * Initialize the Cambria client. */ @Override - public void init() { - this.consumer = new BusConsumer.KafkaConsumerWrapper(BusTopicParams.builder() + public void init() throws MalformedURLException { + BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder() .servers(this.servers) .topic(this.effectiveTopic) - .useHttps(this.useHttps) - .build()); + .fetchTimeout(this.fetchTimeout) + .consumerGroup(this.consumerGroup) + .useHttps(this.useHttps); + + this.consumer = new BusConsumer.KafkaConsumerWrapper(builder + .additionalProps(this.additionalProps) + .build()); } @Override diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java index 49dff287..46a6c398 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java @@ -129,7 +129,6 @@ public final class PolicyEndPointProperties { public static final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; public static final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; - /* Topic Sink Values */ /** diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java index 3e62f98f..113a4bd1 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java @@ -24,9 +24,13 @@ package org.onap.policy.common.endpoints.utils; import com.google.re2j.Pattern; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import lombok.AccessLevel; import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; @@ -47,7 +51,7 @@ public class KafkaPropertyUtils { public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) { final List serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers))); - //TODO More Kafka properties to be added + return BusTopicParams.builder() .servers(serverList) .topic(topic) diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java index 1a815e1a..a00879c1 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java @@ -32,7 +32,7 @@ import org.onap.policy.common.endpoints.parameters.TopicParameters; public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder { - public static final String SERVER = "my-server"; + public static final String SERVER = "localhost:9092"; public static final String TOPIC2 = "my-topic-2"; @Getter diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java new file mode 100644 index 00000000..c109e70a --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java @@ -0,0 +1,192 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX; + +import java.util.Arrays; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; + +public class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase { + + private SinkFactory factory; + public static final String KAFKA_SERVER = "localhost:9092"; + + /** + * Creates the object to be tested. + */ + @Before + @Override + public void setUp() { + super.setUp(); + + factory = new SinkFactory(); + } + + @After + public void tearDown() { + factory.destroy(); + } + + @Test + @Override + public void testBuildBusTopicParams() { + super.testBuildBusTopicParams(); + super.testBuildBusTopicParams_Ex(); + } + + @Test + @Override + public void testBuildListOfStringString() { + super.testBuildListOfStringString(); + + // check parameters that were used + BusTopicParams params = getLastParams(); + assertEquals(false, params.isAllowSelfSignedCerts()); + } + + @Test + @Override + public void testBuildProperties() { + List topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()); + assertEquals(1, topics.size()); + assertEquals(MY_TOPIC, topics.get(0).getTopic()); + assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic()); + + BusTopicParams params = getLastParams(); + assertEquals(true, params.isManaged()); + assertEquals(false, params.isUseHttps()); + assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers()); + assertEquals(MY_TOPIC, params.getTopic()); + assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic()); + assertEquals(MY_PARTITION, params.getPartitionId()); + + List topics2 = buildTopics(makePropBuilder().makeTopic(TOPIC3) + .removeTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX).build()); + assertEquals(1, topics2.size()); + assertEquals(TOPIC3, topics2.get(0).getTopic()); + assertEquals(topics2.get(0).getTopic(), topics2.get(0).getEffectiveTopic()); + + initFactory(); + + assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size()); + } + + @Test + @Override + public void testDestroyString_testGet_testInventory() { + super.testDestroyString_testGet_testInventory(); + super.testDestroyString_Ex(); + } + + @Test + @Override + public void testDestroy() { + super.testDestroy(); + } + + @Test + public void testGet() { + super.testGet_Ex(); + } + + @Test + public void testToString() { + assertTrue(factory.toString().startsWith("IndexedKafkaTopicSinkFactory [")); + } + + @Override + protected void initFactory() { + if (factory != null) { + factory.destroy(); + } + + factory = new SinkFactory(); + } + + @Override + protected List buildTopics(Properties properties) { + return factory.build(properties); + } + + @Override + protected KafkaTopicSink buildTopic(BusTopicParams params) { + return factory.build(params); + } + + @Override + protected KafkaTopicSink buildTopic(List servers, String topic) { + return factory.build(servers, topic); + } + + @Override + protected void destroyFactory() { + factory.destroy(); + } + + @Override + protected void destroyTopic(String topic) { + factory.destroy(topic); + } + + @Override + protected List getInventory() { + return factory.inventory(); + } + + @Override + protected KafkaTopicSink getTopic(String topic) { + return factory.get(topic); + } + + @Override + protected BusTopicParams getLastParams() { + return factory.params.getLast(); + } + + @Override + protected TopicPropertyBuilder makePropBuilder() { + return new KafkaTopicPropertyBuilder(PROPERTY_KAFKA_SINK_TOPICS); + } + + /** + * Factory that records the parameters of all of the sinks it creates. + */ + private static class SinkFactory extends IndexedKafkaTopicSinkFactory { + private Deque params = new LinkedList<>(); + + @Override + protected KafkaTopicSink makeSink(BusTopicParams busTopicParams) { + params.add(busTopicParams); + return super.makeSink(busTopicParams); + } + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java index 6fa80a41..3a62b4a7 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java @@ -24,7 +24,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX; +import java.util.Arrays; import java.util.Deque; import java.util.LinkedList; import java.util.List; @@ -40,6 +42,8 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()); assertEquals(1, topics.size()); assertEquals(MY_TOPIC, topics.get(0).getTopic()); + assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic()); + + BusTopicParams params = getLastParams(); + assertEquals(true, params.isManaged()); + assertEquals(false, params.isUseHttps()); + assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers()); + assertEquals(MY_TOPIC, params.getTopic()); + assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic()); } @Test @Override public void testDestroyString_testGet_testInventory() { + super.testDestroyString_testGet_testInventory(); super.testDestroyString_Ex(); } + @Test + @Override + public void testDestroy() { + super.testDestroy(); + } + @Test public void testGet() { super.testGet_Ex(); diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java index 8b75fa35..bd88eec9 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java @@ -59,6 +59,8 @@ public class TopicTestBase { public static final String ROUTE_PROP = "routeOffer"; public static final String MY_ROUTE = "my-route"; + public static final String MY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + public static final int KAFKA_PORT = 9092; /** * Message used within exceptions that are expected. @@ -75,6 +77,11 @@ public class TopicTestBase { */ protected List servers; + /** + * Servers to be added to the parameter builder. + */ + protected List kafkaServers; + /** * Parameter builder used to build topic parameters. */ @@ -89,13 +96,14 @@ public class TopicTestBase { addProps.put("my-key-B", "my-value-B"); servers = Arrays.asList("svra", "svrb"); + kafkaServers = Arrays.asList("localhost:9092", "10.1.2.3:9092"); builder = makeBuilder(); } /** * Makes a fully populated parameter builder. - * + * * @return a new parameter builder */ public TopicParamsBuilder makeBuilder() { @@ -117,6 +125,39 @@ public class TopicTestBase { .fetchLimit(MY_FETCH_LIMIT).fetchTimeout(MY_FETCH_TIMEOUT).hostname(MY_HOST).latitude(MY_LAT) .longitude(MY_LONG).managed(true).partitionId(MY_PARTITION).partner(MY_PARTNER) .password(MY_PASS).port(MY_PORT).servers(servers).topic(MY_TOPIC) - .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME); + .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME) + .serializationProvider(MY_SERIALIZER); + } + + /** + * Makes a fully populated parameter builder. + * + * @return a new parameter builder + */ + public TopicParamsBuilder makeKafkaBuilder() { + addProps.clear(); + String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule " + + "required username=abc password=abc serviceName=kafka;"; + addProps.put("sasl.jaas.config", jaas); + addProps.put("sasl.mechanism", "SCRAM-SHA-512"); + addProps.put("security.protocol", "SASL_PLAINTEXT"); + + return makeKafkaBuilder(addProps, kafkaServers); + } + + /** + * Makes a fully populated parameter builder. + * + * @param addProps additional properties to be added to the builder + * @param servers servers to be added to the builder + * @return a new parameter builder + */ + public TopicParamsBuilder makeKafkaBuilder(Map addProps, List servers) { + + return BusTopicParams.builder().additionalProps(addProps).basePath(MY_BASE_PATH).clientName(MY_CLIENT_NAME) + .consumerGroup(MY_CONS_GROUP).consumerInstance(MY_CONS_INST).environment(MY_ENV) + .hostname(MY_HOST).partitionId(MY_PARTITION).partner(MY_PARTNER) + .port(KAFKA_PORT).servers(servers).topic(MY_TOPIC) + .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(false); } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java index da9f792b..7df5d129 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java @@ -34,8 +34,11 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.commons.collections4.IteratorUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.junit.Before; import org.junit.Test; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; @@ -299,12 +302,46 @@ public class BusConsumerTest extends TopicTestBase { @Test public void testKafkaConsumerWrapper() throws Exception { // verify that different wrappers can be built - assertThatCode(() -> new KafkaConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException(); + assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException(); + } + + @Test(expected = IllegalArgumentException.class) + public void testKafkaConsumerWrapper_InvalidTopic() throws Exception { + new KafkaConsumerWrapper(makeBuilder().topic(null).build()); + } + + @Test(expected = java.lang.IllegalStateException.class) + public void testKafkaConsumerWrapperFetch() throws Exception { + + //Setup Properties for consumer + Properties kafkaProps = new Properties(); + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test"); + kafkaProps.setProperty("enable.auto.commit", "true"); + kafkaProps.setProperty("auto.commit.interval.ms", "1000"); + kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + KafkaConsumer consumer = new KafkaConsumer<>(kafkaProps); + kafka.consumer = consumer; + + assertFalse(kafka.fetch().iterator().hasNext()); + consumer.close(); + } + + @Test + public void testKafkaConsumerWrapperClose() throws Exception { + assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException(); } @Test public void testKafkaConsumerWrapperToString() throws Exception { - assertNotNull(new KafkaConsumerWrapper(makeBuilder().build()) {}.toString()); + assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString()); } private static class FetchingBusConsumerImpl extends FetchingBusConsumer { diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java new file mode 100644 index 00000000..b40b9541 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java @@ -0,0 +1,71 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus.internal; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; +import org.onap.policy.common.utils.gson.GsonTestUtils; + +public class InlineKafkaTopicSinkTest extends TopicTestBase { + private InlineKafkaTopicSink sink; + + /** + * Creates the object to be tested. + */ + @Before + @Override + public void setUp() { + super.setUp(); + + sink = new InlineKafkaTopicSink(makeKafkaBuilder().build()); + } + + @After + public void tearDown() { + sink.shutdown(); + } + + @Test + public void testToString() { + assertTrue(sink.toString().startsWith("InlineKafkaTopicSink [")); + } + + @Test + public void testInit() { + // nothing null + sink = new InlineKafkaTopicSink(makeKafkaBuilder().build()); + sink.init(); + assertThatCode(() -> sink.shutdown()).doesNotThrowAnyException(); + } + + @Test + public void testGetTopicCommInfrastructure() { + assertEquals(CommInfrastructure.KAFKA, sink.getTopicCommInfrastructure()); + } + +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java index cc096585..6b63c9f4 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java @@ -42,7 +42,7 @@ public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase { public void setUp() { super.setUp(); - source = new SingleThreadedKafkaTopicSource(makeBuilder().build()); + source = new SingleThreadedKafkaTopicSource(makeKafkaBuilder().build()); } @After @@ -50,9 +50,15 @@ public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase { source.shutdown(); } + public void testSerialize() { + assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedKafkaTopicSourceTest.class)) + .doesNotThrowAnyException(); + } + @Test public void testToString() { assertTrue(source.toString().startsWith("SingleThreadedKafkaTopicSource [")); + source.shutdown(); } @Test diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json index 7c512a87..42b7a036 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json @@ -1,11 +1,19 @@ { - "servers" : [ "svra", "svrb" ], - "topic" : "my-topic", - "effectiveTopic" : "my-effective-topic", - "recentEvents" : [ ], - "alive" : false, - "locked" : false, - "useHttps" : true, - "topicCommInfrastructure" : "KAFKA", - "partitionKey" : "my-partition" + "servers": [ + "svra", + "svrb" + ], + "topic": "my-topic", + "effectiveTopic": "my-effective-topic", + "recentEvents": [], + "alive": false, + "locked": false, + "useHttps": false, + "topicCommInfrastructure": "KAFKA", + "partitionKey": "my-partition", + "additionalProps": { + "security.protocol": "SASL_PLAINTEXT", + "sasl.mechanism": "SCRAM-SHA-512", + "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=abc password=abc serviceName=kafka;" + } } diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json index 626d87e8..38cc2f8e 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json @@ -1,10 +1,18 @@ { - "servers" : [ "svra", "svrb" ], - "topic" : "my-topic", - "effectiveTopic" : "my-effective-topic", - "recentEvents" : [ ], - "alive" : false, - "locked" : false, - "useHttps" : true, - "topicCommInfrastructure" : "KAFKA" + "servers": [ + "localhost:9092", + "10.1.2.3:9092" + ], + "topic": "my-topic", + "effectiveTopic": "my-effective-topic", + "recentEvents": [], + "alive": false, + "locked": false, + "useHttps": false, + "topicCommInfrastructure": "KAFKA", + "additionalProps": { + "security.protocol": "SASL_PLAINTEXT", + "sasl.mechanism": "SCRAM-SHA-512", + "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=abc password=abc serviceName=kafka;" + } } -- cgit 1.2.3-korg