summaryrefslogtreecommitdiffstats
path: root/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java
diff options
context:
space:
mode:
Diffstat (limited to 'sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java')
-rw-r--r--sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java24
1 files changed, 22 insertions, 2 deletions
diff --git a/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java b/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java
index 37a3097..c80c87f 100644
--- a/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java
+++ b/sampleClient/src/main/java/org/onap/dmaap/kafka/sample/Main.java
@@ -20,7 +20,11 @@
package org.onap.dmaap.kafka.sample;
+import java.util.List;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.onap.dmaap.kafka.OnapKafkaClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
@@ -29,6 +33,8 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main implements CommandLineRunner{
+ private final Logger log = LoggerFactory.getLogger(OnapKafkaClient.class.getName());
+
@Autowired
private SampleConfiguration configuration;
@@ -37,8 +43,22 @@ public class Main implements CommandLineRunner{
}
@Override
- public void run(String... args) {
+ public void run(String... args) throws InterruptedException {
OnapKafkaClient handler = new OnapKafkaClient(configuration);
- handler.fetchFromTopic("dummy.topic.blah");
+ String testTopic = configuration.getConsumerTopics().get(0);
+ for (int i = 0; i < 5; i++) {
+ RecordMetadata recordMetadata = handler.publishToTopic(testTopic, "dummy-message-"+i);
+ if (recordMetadata != null) {
+ log.info("Topic: {}, Partition: {}, Offset: {}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
+ }
+ }
+ int fetch = 0;
+ while (true) {
+ fetch++;
+ log.info("Fetch {} from topic: {}", fetch, testTopic);
+ List<String> res = handler.fetchFromTopic(testTopic);
+ log.info("Messages from fetch {}: " + res, fetch);
+ Thread.sleep(3000);
+ }
}
} \ No newline at end of file