aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSirisha_Manchikanti <sirisha.manchikanti@est.tech>2022-08-25 15:00:58 +0100
committerSirisha_Manchikanti <sirisha.manchikanti@est.tech>2022-09-21 10:54:56 +0100
commit775b9f1e14a246b2df0a65bd63c0775120659f35 (patch)
tree7f768cbbf9859fe420f6b4e923105425837af85b
parentb433dd58dc50b5c59f84ea18908b5e1a0f25f78a (diff)
Publish and Subscribe to Kafka topic
Issue-ID: POLICY-4134 Signed-off-by: Sirisha_Manchikanti <sirisha.manchikanti@est.tech> Change-Id: Idefa5b6f3cb702a4b478b76570717e73214d235a
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java1
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java62
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java59
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java5
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java24
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java1
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java6
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java2
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java192
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java25
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java45
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java41
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java71
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java8
-rw-r--r--policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json26
-rw-r--r--policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json24
16 files changed, 540 insertions, 52 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
index 47279d47..45a8be3f 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
@@ -65,6 +65,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
}
var kafkaTopicSource = makeSource(busTopicParams);
+
kafkaTopicSources.put(busTopicParams.getTopic(), kafkaTopicSource);
return kafkaTopicSource;
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 8d88b0d9..ee41150f 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -32,6 +32,8 @@ import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -42,6 +44,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
@@ -235,10 +239,13 @@ public interface BusConsumer {
*/
private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
+ private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+
/**
* Kafka consumer.
*/
- private KafkaConsumer<String, String> consumer;
+ protected KafkaConsumer<String, String> consumer;
+ protected Properties kafkaProps;
/**
* Kafka Consumer Wrapper.
@@ -250,20 +257,67 @@ public interface BusConsumer {
* @throws GeneralSecurityException - Security exception
* @throws MalformedURLException - Malformed URL exception
*/
- public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
+ public KafkaConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
super(busTopicParams);
+
+ if (busTopicParams.isTopicInvalid()) {
+ throw new IllegalArgumentException("No topic for Kafka");
+ }
+
+ //Setup Properties for consumer
+ kafkaProps = new Properties();
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ busTopicParams.getServers().get(0));
+
+ if (busTopicParams.isAdditionalPropsValid()) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+ kafkaProps.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+ }
+ if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+ }
+ if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
+ }
+ consumer = new KafkaConsumer<>(kafkaProps);
+ //Subscribe to the topic
+ consumer.subscribe(Arrays.asList(busTopicParams.getTopic()));
}
@Override
public Iterable<String> fetch() throws IOException {
- // TODO: Not implemented yet
- return new ArrayList<>();
+ ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
+ if (records == null || records.count() <= 0) {
+ return Collections.emptyList();
+ }
+ List<String> messages = new ArrayList<>(records.count());
+ try {
+ for (TopicPartition partition : records.partitions()) {
+ List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+ for (ConsumerRecord<String, String> record : partitionRecords) {
+ messages.add(record.value());
+ }
+ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ }
+ } catch (Exception e) {
+ logger.error("{}: cannot fetch because of {}", this, e.getMessage());
+ sleepAfterFetchFailure();
+ throw e;
+ }
+ return messages;
}
@Override
public void close() {
super.close();
this.consumer.close();
+ logger.info("Kafka Consumer exited {}", this);
}
@Override
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index e0df7095..fe9bab23 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -32,13 +32,14 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.record.CompressionType;
import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
import org.onap.dmaap.mr.client.response.MRPublisherResponse;
import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
@@ -155,22 +156,45 @@ public interface BusPublisher {
public static class KafkaPublisherWrapper implements BusPublisher {
private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
+ private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+
+ private String topic;
/**
- * The actual Kafka publisher.
+ * Kafka publisher.
*/
- private final KafkaProducer producer;
+ private Producer<String, String> producer;
+ protected Properties kafkaProps;
/**
- * Constructor.
+ * Kafka Publisher Wrapper.
*
* @param busTopicParams topic parameters
*/
- public KafkaPublisherWrapper(BusTopicParams busTopicParams) {
- // TODO Setting of topic parameters is not implemented yet.
- //Setup Properties for Kafka Producer
- Properties kafkaProps = new Properties();
- this.producer = new KafkaProducer(kafkaProps);
+ protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
+
+ if (busTopicParams.isTopicInvalid()) {
+ throw new IllegalArgumentException("No topic for Kafka");
+ }
+
+ this.topic = busTopicParams.getTopic();
+
+ //Setup Properties for consumer
+ kafkaProps = new Properties();
+ kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
+ if (busTopicParams.isAdditionalPropsValid()) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+ kafkaProps.put(entry.getKey(), entry.getValue());
+ }
+ }
+ if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+ }
+ if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
+ kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+ }
+
+ producer = new KafkaProducer<>(kafkaProps);
}
@Override
@@ -178,7 +202,18 @@ public interface BusPublisher {
if (message == null) {
throw new IllegalArgumentException("No message provided");
}
- // TODO Sending messages is not implemented yet
+
+ try {
+ //Create the record
+ ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
+ UUID.randomUUID().toString(), message);
+
+ this.producer.send(record);
+ producer.flush();
+ } catch (Exception e) {
+ logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
+ return false;
+ }
return true;
}
@@ -186,7 +221,7 @@ public interface BusPublisher {
public void close() {
logger.info("{}: CLOSE", this);
- try (this.producer) {
+ try {
this.producer.close();
} catch (Exception e) {
logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
index b564229b..6574d408 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
@@ -18,6 +18,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import java.util.Map;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
import org.slf4j.Logger;
@@ -34,6 +35,8 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop
*/
private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
+ protected Map<String, String> additionalProps = null;
+
/**
* Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned
* attributes.
@@ -47,6 +50,7 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop
*/
public InlineKafkaTopicSink(BusTopicParams busTopicParams) {
super(busTopicParams);
+ this.additionalProps = busTopicParams.getAdditionalProps();
}
/**
@@ -59,6 +63,7 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop
.servers(this.servers)
.topic(this.effectiveTopic)
.useHttps(this.useHttps)
+ .additionalProps(this.additionalProps)
.build());
logger.info("{}: KAFKA SINK created", this);
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
index b8362b83..2a651ee7 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
@@ -18,6 +18,8 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import java.net.MalformedURLException;
+import java.util.Map;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
@@ -27,6 +29,8 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
*/
public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource {
+ protected Map<String, String> additionalProps = null;
+
/**
* Constructor.
*
@@ -35,19 +39,29 @@ public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource
*/
public SingleThreadedKafkaTopicSource(BusTopicParams busTopicParams) {
super(busTopicParams);
- this.init();
+ this.additionalProps = busTopicParams.getAdditionalProps();
+ try {
+ this.init();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("ERROR during init in kafka-source: cannot create topic " + topic, e);
+ }
}
/**
* Initialize the Cambria client.
*/
@Override
- public void init() {
- this.consumer = new BusConsumer.KafkaConsumerWrapper(BusTopicParams.builder()
+ public void init() throws MalformedURLException {
+ BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder()
.servers(this.servers)
.topic(this.effectiveTopic)
- .useHttps(this.useHttps)
- .build());
+ .fetchTimeout(this.fetchTimeout)
+ .consumerGroup(this.consumerGroup)
+ .useHttps(this.useHttps);
+
+ this.consumer = new BusConsumer.KafkaConsumerWrapper(builder
+ .additionalProps(this.additionalProps)
+ .build());
}
@Override
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
index 49dff287..46a6c398 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
@@ -129,7 +129,6 @@ public final class PolicyEndPointProperties {
public static final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
public static final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
-
/* Topic Sink Values */
/**
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java
index 3e62f98f..113a4bd1 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java
@@ -24,9 +24,13 @@ package org.onap.policy.common.endpoints.utils;
import com.google.re2j.Pattern;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
@@ -47,7 +51,7 @@ public class KafkaPropertyUtils {
public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) {
final List<String> serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers)));
- //TODO More Kafka properties to be added
+
return BusTopicParams.builder()
.servers(serverList)
.topic(topic)
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
index 1a815e1a..a00879c1 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
@@ -32,7 +32,7 @@ import org.onap.policy.common.endpoints.parameters.TopicParameters;
public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
- public static final String SERVER = "my-server";
+ public static final String SERVER = "localhost:9092";
public static final String TOPIC2 = "my-topic-2";
@Getter
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
new file mode 100644
index 00000000..c109e70a
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
@@ -0,0 +1,192 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
+
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+
+public class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSink> {
+
+ private SinkFactory factory;
+ public static final String KAFKA_SERVER = "localhost:9092";
+
+ /**
+ * Creates the object to be tested.
+ */
+ @Before
+ @Override
+ public void setUp() {
+ super.setUp();
+
+ factory = new SinkFactory();
+ }
+
+ @After
+ public void tearDown() {
+ factory.destroy();
+ }
+
+ @Test
+ @Override
+ public void testBuildBusTopicParams() {
+ super.testBuildBusTopicParams();
+ super.testBuildBusTopicParams_Ex();
+ }
+
+ @Test
+ @Override
+ public void testBuildListOfStringString() {
+ super.testBuildListOfStringString();
+
+ // check parameters that were used
+ BusTopicParams params = getLastParams();
+ assertEquals(false, params.isAllowSelfSignedCerts());
+ }
+
+ @Test
+ @Override
+ public void testBuildProperties() {
+ List<KafkaTopicSink> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build());
+ assertEquals(1, topics.size());
+ assertEquals(MY_TOPIC, topics.get(0).getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic());
+
+ BusTopicParams params = getLastParams();
+ assertEquals(true, params.isManaged());
+ assertEquals(false, params.isUseHttps());
+ assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers());
+ assertEquals(MY_TOPIC, params.getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
+ assertEquals(MY_PARTITION, params.getPartitionId());
+
+ List<KafkaTopicSink> topics2 = buildTopics(makePropBuilder().makeTopic(TOPIC3)
+ .removeTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX).build());
+ assertEquals(1, topics2.size());
+ assertEquals(TOPIC3, topics2.get(0).getTopic());
+ assertEquals(topics2.get(0).getTopic(), topics2.get(0).getEffectiveTopic());
+
+ initFactory();
+
+ assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size());
+ }
+
+ @Test
+ @Override
+ public void testDestroyString_testGet_testInventory() {
+ super.testDestroyString_testGet_testInventory();
+ super.testDestroyString_Ex();
+ }
+
+ @Test
+ @Override
+ public void testDestroy() {
+ super.testDestroy();
+ }
+
+ @Test
+ public void testGet() {
+ super.testGet_Ex();
+ }
+
+ @Test
+ public void testToString() {
+ assertTrue(factory.toString().startsWith("IndexedKafkaTopicSinkFactory ["));
+ }
+
+ @Override
+ protected void initFactory() {
+ if (factory != null) {
+ factory.destroy();
+ }
+
+ factory = new SinkFactory();
+ }
+
+ @Override
+ protected List<KafkaTopicSink> buildTopics(Properties properties) {
+ return factory.build(properties);
+ }
+
+ @Override
+ protected KafkaTopicSink buildTopic(BusTopicParams params) {
+ return factory.build(params);
+ }
+
+ @Override
+ protected KafkaTopicSink buildTopic(List<String> servers, String topic) {
+ return factory.build(servers, topic);
+ }
+
+ @Override
+ protected void destroyFactory() {
+ factory.destroy();
+ }
+
+ @Override
+ protected void destroyTopic(String topic) {
+ factory.destroy(topic);
+ }
+
+ @Override
+ protected List<KafkaTopicSink> getInventory() {
+ return factory.inventory();
+ }
+
+ @Override
+ protected KafkaTopicSink getTopic(String topic) {
+ return factory.get(topic);
+ }
+
+ @Override
+ protected BusTopicParams getLastParams() {
+ return factory.params.getLast();
+ }
+
+ @Override
+ protected TopicPropertyBuilder makePropBuilder() {
+ return new KafkaTopicPropertyBuilder(PROPERTY_KAFKA_SINK_TOPICS);
+ }
+
+ /**
+ * Factory that records the parameters of all of the sinks it creates.
+ */
+ private static class SinkFactory extends IndexedKafkaTopicSinkFactory {
+ private Deque<BusTopicParams> params = new LinkedList<>();
+
+ @Override
+ protected KafkaTopicSink makeSink(BusTopicParams busTopicParams) {
+ params.add(busTopicParams);
+ return super.makeSink(busTopicParams);
+ }
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java
index 6fa80a41..3a62b4a7 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java
@@ -24,7 +24,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
+import java.util.Arrays;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
@@ -40,6 +42,8 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka
private SourceFactory factory;
+ public static final String KAFKA_SERVER = "localhost:9092";
+
/**
* Creates the object to be tested.
*/
@@ -58,12 +62,6 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka
@Test
@Override
- public void testBuildBusTopicParams() {
- super.testBuildBusTopicParams_Ex();
- }
-
- @Test
- @Override
public void testBuildProperties() {
initFactory();
@@ -71,15 +69,30 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka
List<KafkaTopicSource> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build());
assertEquals(1, topics.size());
assertEquals(MY_TOPIC, topics.get(0).getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic());
+
+ BusTopicParams params = getLastParams();
+ assertEquals(true, params.isManaged());
+ assertEquals(false, params.isUseHttps());
+ assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers());
+ assertEquals(MY_TOPIC, params.getTopic());
+ assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
}
@Test
@Override
public void testDestroyString_testGet_testInventory() {
+ super.testDestroyString_testGet_testInventory();
super.testDestroyString_Ex();
}
@Test
+ @Override
+ public void testDestroy() {
+ super.testDestroy();
+ }
+
+ @Test
public void testGet() {
super.testGet_Ex();
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java
index 8b75fa35..bd88eec9 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java
@@ -59,6 +59,8 @@ public class TopicTestBase {
public static final String ROUTE_PROP = "routeOffer";
public static final String MY_ROUTE = "my-route";
+ public static final String MY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+ public static final int KAFKA_PORT = 9092;
/**
* Message used within exceptions that are expected.
@@ -76,6 +78,11 @@ public class TopicTestBase {
protected List<String> servers;
/**
+ * Servers to be added to the parameter builder.
+ */
+ protected List<String> kafkaServers;
+
+ /**
* Parameter builder used to build topic parameters.
*/
protected TopicParamsBuilder builder;
@@ -89,13 +96,14 @@ public class TopicTestBase {
addProps.put("my-key-B", "my-value-B");
servers = Arrays.asList("svra", "svrb");
+ kafkaServers = Arrays.asList("localhost:9092", "10.1.2.3:9092");
builder = makeBuilder();
}
/**
* Makes a fully populated parameter builder.
- *
+ *
* @return a new parameter builder
*/
public TopicParamsBuilder makeBuilder() {
@@ -117,6 +125,39 @@ public class TopicTestBase {
.fetchLimit(MY_FETCH_LIMIT).fetchTimeout(MY_FETCH_TIMEOUT).hostname(MY_HOST).latitude(MY_LAT)
.longitude(MY_LONG).managed(true).partitionId(MY_PARTITION).partner(MY_PARTNER)
.password(MY_PASS).port(MY_PORT).servers(servers).topic(MY_TOPIC)
- .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME);
+ .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME)
+ .serializationProvider(MY_SERIALIZER);
+ }
+
+ /**
+ * Makes a fully populated parameter builder.
+ *
+ * @return a new parameter builder
+ */
+ public TopicParamsBuilder makeKafkaBuilder() {
+ addProps.clear();
+ String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule "
+ + "required username=abc password=abc serviceName=kafka;";
+ addProps.put("sasl.jaas.config", jaas);
+ addProps.put("sasl.mechanism", "SCRAM-SHA-512");
+ addProps.put("security.protocol", "SASL_PLAINTEXT");
+
+ return makeKafkaBuilder(addProps, kafkaServers);
+ }
+
+ /**
+ * Makes a fully populated parameter builder.
+ *
+ * @param addProps additional properties to be added to the builder
+ * @param servers servers to be added to the builder
+ * @return a new parameter builder
+ */
+ public TopicParamsBuilder makeKafkaBuilder(Map<String, String> addProps, List<String> servers) {
+
+ return BusTopicParams.builder().additionalProps(addProps).basePath(MY_BASE_PATH).clientName(MY_CLIENT_NAME)
+ .consumerGroup(MY_CONS_GROUP).consumerInstance(MY_CONS_INST).environment(MY_ENV)
+ .hostname(MY_HOST).partitionId(MY_PARTITION).partner(MY_PARTNER)
+ .port(KAFKA_PORT).servers(servers).topic(MY_TOPIC)
+ .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(false);
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
index da9f792b..7df5d129 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
@@ -34,8 +34,11 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.collections4.IteratorUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Before;
import org.junit.Test;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
@@ -299,12 +302,46 @@ public class BusConsumerTest extends TopicTestBase {
@Test
public void testKafkaConsumerWrapper() throws Exception {
// verify that different wrappers can be built
- assertThatCode(() -> new KafkaConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
+ assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testKafkaConsumerWrapper_InvalidTopic() throws Exception {
+ new KafkaConsumerWrapper(makeBuilder().topic(null).build());
+ }
+
+ @Test(expected = java.lang.IllegalStateException.class)
+ public void testKafkaConsumerWrapperFetch() throws Exception {
+
+ //Setup Properties for consumer
+ Properties kafkaProps = new Properties();
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
+ kafkaProps.setProperty("enable.auto.commit", "true");
+ kafkaProps.setProperty("auto.commit.interval.ms", "1000");
+ kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
+ kafka.consumer = consumer;
+
+ assertFalse(kafka.fetch().iterator().hasNext());
+ consumer.close();
+ }
+
+ @Test
+ public void testKafkaConsumerWrapperClose() throws Exception {
+ assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
}
@Test
public void testKafkaConsumerWrapperToString() throws Exception {
- assertNotNull(new KafkaConsumerWrapper(makeBuilder().build()) {}.toString());
+ assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
}
private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java
new file mode 100644
index 00000000..b40b9541
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java
@@ -0,0 +1,71 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
+import org.onap.policy.common.utils.gson.GsonTestUtils;
+
+public class InlineKafkaTopicSinkTest extends TopicTestBase {
+ private InlineKafkaTopicSink sink;
+
+ /**
+ * Creates the object to be tested.
+ */
+ @Before
+ @Override
+ public void setUp() {
+ super.setUp();
+
+ sink = new InlineKafkaTopicSink(makeKafkaBuilder().build());
+ }
+
+ @After
+ public void tearDown() {
+ sink.shutdown();
+ }
+
+ @Test
+ public void testToString() {
+ assertTrue(sink.toString().startsWith("InlineKafkaTopicSink ["));
+ }
+
+ @Test
+ public void testInit() {
+ // nothing null
+ sink = new InlineKafkaTopicSink(makeKafkaBuilder().build());
+ sink.init();
+ assertThatCode(() -> sink.shutdown()).doesNotThrowAnyException();
+ }
+
+ @Test
+ public void testGetTopicCommInfrastructure() {
+ assertEquals(CommInfrastructure.KAFKA, sink.getTopicCommInfrastructure());
+ }
+
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java
index cc096585..6b63c9f4 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java
@@ -42,7 +42,7 @@ public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase {
public void setUp() {
super.setUp();
- source = new SingleThreadedKafkaTopicSource(makeBuilder().build());
+ source = new SingleThreadedKafkaTopicSource(makeKafkaBuilder().build());
}
@After
@@ -50,9 +50,15 @@ public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase {
source.shutdown();
}
+ public void testSerialize() {
+ assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedKafkaTopicSourceTest.class))
+ .doesNotThrowAnyException();
+ }
+
@Test
public void testToString() {
assertTrue(source.toString().startsWith("SingleThreadedKafkaTopicSource ["));
+ source.shutdown();
}
@Test
diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json
index 7c512a87..42b7a036 100644
--- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json
+++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json
@@ -1,11 +1,19 @@
{
- "servers" : [ "svra", "svrb" ],
- "topic" : "my-topic",
- "effectiveTopic" : "my-effective-topic",
- "recentEvents" : [ ],
- "alive" : false,
- "locked" : false,
- "useHttps" : true,
- "topicCommInfrastructure" : "KAFKA",
- "partitionKey" : "my-partition"
+ "servers": [
+ "svra",
+ "svrb"
+ ],
+ "topic": "my-topic",
+ "effectiveTopic": "my-effective-topic",
+ "recentEvents": [],
+ "alive": false,
+ "locked": false,
+ "useHttps": false,
+ "topicCommInfrastructure": "KAFKA",
+ "partitionKey": "my-partition",
+ "additionalProps": {
+ "security.protocol": "SASL_PLAINTEXT",
+ "sasl.mechanism": "SCRAM-SHA-512",
+ "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=abc password=abc serviceName=kafka;"
+ }
}
diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json
index 626d87e8..38cc2f8e 100644
--- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json
+++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json
@@ -1,10 +1,18 @@
{
- "servers" : [ "svra", "svrb" ],
- "topic" : "my-topic",
- "effectiveTopic" : "my-effective-topic",
- "recentEvents" : [ ],
- "alive" : false,
- "locked" : false,
- "useHttps" : true,
- "topicCommInfrastructure" : "KAFKA"
+ "servers": [
+ "localhost:9092",
+ "10.1.2.3:9092"
+ ],
+ "topic": "my-topic",
+ "effectiveTopic": "my-effective-topic",
+ "recentEvents": [],
+ "alive": false,
+ "locked": false,
+ "useHttps": false,
+ "topicCommInfrastructure": "KAFKA",
+ "additionalProps": {
+ "security.protocol": "SASL_PLAINTEXT",
+ "sasl.mechanism": "SCRAM-SHA-512",
+ "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=abc password=abc serviceName=kafka;"
+ }
}