From 61b3ff91485571c24834b31c6ee7efc7ab1d0243 Mon Sep 17 00:00:00 2001 From: Rupali Shirode Date: Thu, 7 Dec 2023 16:39:19 +0530 Subject: [SO] Remove DMaap Dependency in SO-bpmn-infra Remove DMaap Dependency in SO-bpmn-infra Issue-ID: SO-4122 Change-Id: I8fbe5761430c21b3f49b31a45ede095fdb72628f Signed-off-by: Rupali Shirode --- common/pom.xml | 17 ++++ .../java/org/onap/so/client/kafka/KafkaClient.java | 21 +++++ .../onap/so/client/kafka/KafkaConsumerImpl.java | 104 +++++++++++++++++++++ .../resources/kafka/default-consumer.properties | 6 ++ .../so/client/kafka/KafkaConsumerImplTest.java | 51 ++++++++++ 5 files changed, 199 insertions(+) create mode 100644 common/src/main/java/org/onap/so/client/kafka/KafkaClient.java create mode 100644 common/src/main/java/org/onap/so/client/kafka/KafkaConsumerImpl.java create mode 100644 common/src/main/resources/kafka/default-consumer.properties create mode 100644 common/src/test/java/org/onap/so/client/kafka/KafkaConsumerImplTest.java (limited to 'common') diff --git a/common/pom.xml b/common/pom.xml index 9713d006e1..847c9464a0 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -308,6 +308,23 @@ org.springframework.cloud spring-cloud-starter-sleuth + + + org.apache.kafka + kafka-clients + 3.3.1 + + + uk.org.webcompere + system-stubs-jupiter + 1.1.0 + test + + + org.junit.jupiter + junit-jupiter-engine + 5.5.2 + diff --git a/common/src/main/java/org/onap/so/client/kafka/KafkaClient.java b/common/src/main/java/org/onap/so/client/kafka/KafkaClient.java new file mode 100644 index 0000000000..2c695255e0 --- /dev/null +++ b/common/src/main/java/org/onap/so/client/kafka/KafkaClient.java @@ -0,0 +1,21 @@ +package org.onap.so.client.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; +import java.io.IOException; +import java.util.Properties; + +public class KafkaClient { + protected static Logger logger = LoggerFactory.getLogger(KafkaClient.class); + protected final Properties properties; + + public KafkaClient(String filepath) throws IOException { + Resource resource = new ClassPathResource(filepath); + this.properties = new Properties(); + properties.load(resource.getInputStream()); + + } + +} diff --git a/common/src/main/java/org/onap/so/client/kafka/KafkaConsumerImpl.java b/common/src/main/java/org/onap/so/client/kafka/KafkaConsumerImpl.java new file mode 100644 index 0000000000..69dd16acf8 --- /dev/null +++ b/common/src/main/java/org/onap/so/client/kafka/KafkaConsumerImpl.java @@ -0,0 +1,104 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.so.client.kafka; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +public class KafkaConsumerImpl extends KafkaClient { + + protected static Logger logger = LoggerFactory.getLogger(KafkaConsumerImpl.class); + private static final String kafkaBootstrapServers = "kafkaBootstrapServers"; + private Consumer consumer; + + public KafkaConsumerImpl(String bootstrapServers) throws Exception { + super("kafka/default-consumer.properties"); + setProperties(bootstrapServers); + } + + + public List get(String topic, String consumerGroup, String consumerId) { + logger.info("consuming message from kafka topic : " + topic); + this.properties.put("group.id", consumerGroup); + this.properties.put("client.id", consumerId); + if (consumer == null) { + consumer = getKafkaConsumer(properties); + consumer.subscribe(Arrays.asList(topic)); + } + ArrayList msgs = new ArrayList<>(); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + for (ConsumerRecord rec : records) { + msgs.add(rec.value()); + } + logger.info("<<<<<<<<<<<<<<<<<<<<<<<<<<< logger.info("MESSAGE CONSUMED FROM KAFKA : <<<<<" + msg + ">>>>>")); + return msgs; + } + + private void setProperties(String bootstrapServers) throws Exception { + if (bootstrapServers == null) { + logger.error("Environment Variable " + kafkaBootstrapServers + " is missing"); + throw new Exception("Environment Variable " + kafkaBootstrapServers + " is missing"); + } else { + this.properties.put("bootstrap.servers", bootstrapServers); + } + + if (System.getenv("JAAS_CONFIG") == null) { + logger.info("Not using any authentication for kafka interaction"); + } else { + logger.info("Using {} authentication provided for kafka interaction", + ScramMechanism.SCRAM_SHA_512.mechanismName()); + this.properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); + this.properties.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()); + this.properties.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG")); + } + } + + public static KafkaConsumer getKafkaConsumer(Properties properties) { + return new KafkaConsumer<>(properties); + } + + public void setConsumer(Consumer kafkaConsumer) { + this.consumer = kafkaConsumer; + } + + public void close() { + if (consumer != null) { + logger.info("Closing the Kafka Consumer"); + consumer.close(); + consumer = null; + } + } + +} diff --git a/common/src/main/resources/kafka/default-consumer.properties b/common/src/main/resources/kafka/default-consumer.properties new file mode 100644 index 0000000000..a7edf58b6b --- /dev/null +++ b/common/src/main/resources/kafka/default-consumer.properties @@ -0,0 +1,6 @@ +key.deserializer=org.apache.kafka.common.serialization.StringDeserializer +value.deserializer=org.apache.kafka.common.serialization.StringDeserializer +max.poll.interval.ms=300000 +heartbeat.interval.ms=60000 +session.timeout.ms=240000 +max.poll.records=1000 diff --git a/common/src/test/java/org/onap/so/client/kafka/KafkaConsumerImplTest.java b/common/src/test/java/org/onap/so/client/kafka/KafkaConsumerImplTest.java new file mode 100644 index 0000000000..d71e562b64 --- /dev/null +++ b/common/src/test/java/org/onap/so/client/kafka/KafkaConsumerImplTest.java @@ -0,0 +1,51 @@ +package org.onap.so.client.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import static org.assertj.core.api.Assertions.assertThat; + +@ExtendWith(SystemStubsExtension.class) +public class KafkaConsumerImplTest { + private KafkaConsumerImpl consumer; + private static MockConsumer mockConsumer; + @SystemStub + EnvironmentVariables environmentVariables = new EnvironmentVariables(); + + @Before + public void setup() { + environmentVariables.set("JAAS_CONFIG", "jaas.config"); + mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + configureMockConsumer(); + } + + @Test + public void consumerShouldConsumeMessages() throws Exception { + consumer = new KafkaConsumerImpl("localhost:9092"); + consumer.setConsumer(mockConsumer); + List response = consumer.get("TOPIC", "CG1", "C1"); + assertThat(response).contains("I", "like", "pizza"); + } + + private void configureMockConsumer() { + mockConsumer.assign(Arrays.asList(new TopicPartition("TOPIC", 0))); + + HashMap beginningOffsets = new HashMap<>(); + beginningOffsets.put(new TopicPartition("TOPIC", 0), 0L); + mockConsumer.updateBeginningOffsets(beginningOffsets); + mockConsumer.addRecord(new ConsumerRecord("TOPIC", 0, 0L, "key", "I")); + mockConsumer.addRecord(new ConsumerRecord("TOPIC", 0, 1L, "key", "like")); + mockConsumer.addRecord(new ConsumerRecord("TOPIC", 0, 2L, "key", "pizza")); + + } +} -- cgit 1.2.3-korg