summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaClient.java2
-rw-r--r--kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaConsumer.java2
-rw-r--r--sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java24
-rw-r--r--sampleClient/src/main/java/org/onap/dmaap/kafka/sample/SampleConfiguration.java1
-rw-r--r--sampleClient/src/main/resources/application.yaml13
-rw-r--r--sampleClient/src/main/resources/docker-compose/config.properties3
-rw-r--r--sampleClient/src/main/resources/docker-compose/kafka.jaas.conf13
-rwxr-xr-xsampleClient/src/main/resources/docker-compose/runner.sh65
-rw-r--r--sampleClient/src/main/resources/docker-compose/scram-docker-compose.yml49
-rw-r--r--sampleClient/src/main/resources/docker-compose/zookeeper.sasl.jaas.config4
10 files changed, 163 insertions, 13 deletions
diff --git a/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaClient.java b/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaClient.java
index 7986869..71fcac0 100644
--- a/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaClient.java
+++ b/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaClient.java
@@ -70,7 +70,7 @@ public class OnapKafkaClient {
* Publish data to a given topic
* @param topicName The topic to which the message should be published
* @param data The data to publish to the topic specified
- * @return
+ * @return The RecordMetedata of the request
*/
public RecordMetadata publishToTopic(String topicName, String data) {
// Should we check the data size and chunk it if necessary? Do we need to?
diff --git a/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaConsumer.java b/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaConsumer.java
index e08e229..c6e312d 100644
--- a/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaConsumer.java
+++ b/kafkaClient/src/main/java/org/onap/dmaap/kafka/OnapKafkaConsumer.java
@@ -56,7 +56,7 @@ public class OnapKafkaConsumer {
Properties props = new Properties();
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-consumer-" + UUID.randomUUID());
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, configuration.getKafkaSecurityProtocolConfig());
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, configuration.getKafkaBootstrapServers());
props.put(SaslConfigs.SASL_JAAS_CONFIG, configuration.getKafkaSaslJaasConfig());
diff --git a/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java b/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java
index 37a3097..c80c87f 100644
--- a/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java
+++ b/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java
@@ -20,7 +20,11 @@
package org.onap.dmaap.kafka.sample;
+import java.util.List;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.onap.dmaap.kafka.OnapKafkaClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
@@ -29,6 +33,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main implements CommandLineRunner{
+ private final Logger log = LoggerFactory.getLogger(OnapKafkaClient.class.getName());
+
@Autowired
private SampleConfiguration configuration;
@@ -37,8 +43,22 @@ public class Main implements CommandLineRunner{
}
@Override
- public void run(String... args) {
+ public void run(String... args) throws InterruptedException {
OnapKafkaClient handler = new OnapKafkaClient(configuration);
- handler.fetchFromTopic("dummy.topic.blah");
+ String testTopic = configuration.getConsumerTopics().get(0);
+ for (int i = 0; i < 5; i++) {
+ RecordMetadata recordMetadata = handler.publishToTopic(testTopic, "dummy-message-"+i);
+ if (recordMetadata != null) {
+ log.info("Topic: {}, Partition: {}, Offset: {}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
+ }
+ }
+ int fetch = 0;
+ while (true) {
+ fetch++;
+ log.info("Fetch {} from topic: {}", fetch, testTopic);
+ List<String> res = handler.fetchFromTopic(testTopic);
+ log.info("Messages from fetch {}: " + res, fetch);
+ Thread.sleep(3000);
+ }
}
} \ No newline at end of file
diff --git a/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/SampleConfiguration.java b/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/SampleConfiguration.java
index 0cb5498..601504a 100644
--- a/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/SampleConfiguration.java
+++ b/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/SampleConfiguration.java
@@ -44,5 +44,4 @@ public class SampleConfiguration implements IKafkaConfig {
private List<String> producerTopics;
private String kafkaSaslJaasConfig;
- // private String kafkaSaslMechanism;
}
diff --git a/sampleClient/src/main/resources/application.yaml b/sampleClient/src/main/resources/application.yaml
index b8a0f70..c592fe4 100644
--- a/sampleClient/src/main/resources/application.yaml
+++ b/sampleClient/src/main/resources/application.yaml
@@ -1,11 +1,8 @@
kafka:
kafkaBootstrapServers: [localhost:9092]
pollingTimeout: 10
- consumerGroup: my-consumer-group
- consumerID: my-consumer-id
- consumerTopics: [test.mytopic.1, test.mytopic.2]
- producerTopics: [test.mytopic.3]
- kafkaSaslJaasConfig: ${SASL_JAAS_CONFIG:org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;}
-
- #kafkaSaslJaasConfig: ${SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=admin password=admin-secret;}
- #kafkaSaslMechanism: ${SASL_MECHANISM:PLAIN} \ No newline at end of file
+ consumerGroup: test-consumer-group
+ consumerID: test-id
+ consumerTopics: [test-topic.1]
+ producerTopics: [test-topic.1]
+ kafkaSaslJaasConfig: ${SASL_JAAS_CONFIG:org.apache.kafka.common.security.scram.ScramLoginModule required username="client" password="client-secret";}
diff --git a/sampleClient/src/main/resources/docker-compose/config.properties b/sampleClient/src/main/resources/docker-compose/config.properties
new file mode 100644
index 0000000..7b8734e
--- /dev/null
+++ b/sampleClient/src/main/resources/docker-compose/config.properties
@@ -0,0 +1,3 @@
+sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="client" password="client-secret";
+security.protocol=SASL_PLAINTEXT
+sasl.mechanism=SCRAM-SHA-512
diff --git a/sampleClient/src/main/resources/docker-compose/kafka.jaas.conf b/sampleClient/src/main/resources/docker-compose/kafka.jaas.conf
new file mode 100644
index 0000000..1124681
--- /dev/null
+++ b/sampleClient/src/main/resources/docker-compose/kafka.jaas.conf
@@ -0,0 +1,13 @@
+KafkaServer {
+ org.apache.kafka.common.security.scram.ScramLoginModule required
+ username="broker"
+ password="broker"
+ user_broker="broker"
+ user_client="client-secret";
+};
+
+Client {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="kafka"
+ password="kafka";
+};
diff --git a/sampleClient/src/main/resources/docker-compose/runner.sh b/sampleClient/src/main/resources/docker-compose/runner.sh
new file mode 100755
index 0000000..2a188b1
--- /dev/null
+++ b/sampleClient/src/main/resources/docker-compose/runner.sh
@@ -0,0 +1,65 @@
+#!/bin/bash
+
+function start {
+ docker compose -f scram-docker-compose.yml up -d
+
+ until [ "$(docker inspect -f {{.State.Running}} broker)" == "true" ]; do
+ sleep 1;
+ done;
+
+ echo -e "\n Creating kafka users"
+ docker exec broker kafka-configs --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=broker],SCRAM-SHA-512=[password=broker]' --entity-type users --entity-name broker
+ docker exec broker kafka-configs --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=client-secret],SCRAM-SHA-512=[password=client-secret]' --entity-type users --entity-name client
+
+ echo -e "\n Creating test topic"
+ docker exec broker kafka-topics --create --bootstrap-server broker:9092 --replication-factor 1 --partitions 1 --topic test-topic.1 --command-config config.properties
+
+ echo -e "\n Listing existing topics"
+ docker exec broker kafka-topics --list --bootstrap-server localhost:9092 --command-config config.properties
+
+ echo -e "\n Adding broker to /etc/hosts"
+ echo '127.0.0.1 broker' | sudo tee -a /etc/hosts
+}
+
+
+function stop {
+
+ docker compose -f scram-docker-compose.yml down
+
+ sudo sed -i.bak '/broker/d' /etc/hosts
+}
+
+function publisher {
+ docker exec -it broker kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic.1 --producer.config config.properties
+}
+
+showHelp() {
+cat << EOF
+Usage: ./runner.sh [start|stop]
+
+start
+
+stop
+
+EOF
+}
+
+while true
+do
+case "$1" in
+start)
+ start
+ ;;
+pub)
+ publisher
+ ;;
+stop)
+ stop
+ ;;
+*)
+ showHelp
+ shift
+ break;;
+esac
+shift
+done \ No newline at end of file
diff --git a/sampleClient/src/main/resources/docker-compose/scram-docker-compose.yml b/sampleClient/src/main/resources/docker-compose/scram-docker-compose.yml
new file mode 100644
index 0000000..562ad97
--- /dev/null
+++ b/sampleClient/src/main/resources/docker-compose/scram-docker-compose.yml
@@ -0,0 +1,49 @@
+version: '3.5'
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:7.3.2
+ hostname: zookeeper
+ container_name: zookeeper
+ restart: always
+ ports:
+ - "9999:9999"
+ volumes:
+ - ./zookeeper.sasl.jaas.config:/etc/kafka/zookeeper_server_jaas.conf
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ KAFKA_JMX_HOSTNAME: localhost
+ KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/zookeeper_server_jaas.conf
+ -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
+ -Dzookeeper.allowSaslFailedClients=false
+ -Dzookeeper.requireClientAuthScheme=sasl
+
+ broker:
+ image: confluentinc/cp-server:7.3.2
+ hostname: broker
+ container_name: broker
+ restart: always
+ ports:
+ - "9092:9092"
+ volumes:
+ - ./kafka.jaas.conf:/etc/kafka/kafka_server_jaas.conf
+ - ./config.properties:/home/appuser/config.properties
+ depends_on:
+ - zookeeper
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
+ KAFKA_LISTENERS: SASL_PLAINTEXT://:9092
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SASL_PLAINTEXT:SASL_PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://broker:9092
+ KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-512
+ KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512
+ KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
+ CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
+ CONFLUENT_METRICS_ENABLE: 'false'
+ KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
+
diff --git a/sampleClient/src/main/resources/docker-compose/zookeeper.sasl.jaas.config b/sampleClient/src/main/resources/docker-compose/zookeeper.sasl.jaas.config
new file mode 100644
index 0000000..9575461
--- /dev/null
+++ b/sampleClient/src/main/resources/docker-compose/zookeeper.sasl.jaas.config
@@ -0,0 +1,4 @@
+Server {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_kafka="kafka";
+};