aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src
diff options
context:
space:
mode:
Diffstat (limited to 'vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src')
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java81
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java40
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java51
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java38
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java14
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml10
6 files changed, 234 insertions, 0 deletions
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java
new file mode 100644
index 00000000..2042a146
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java
@@ -0,0 +1,81 @@
+import config.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class CreateKafkaConsumer {
+
+
+ private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class);
+
+ private final String BOOTSTRAP_SERVERS = (String) Configuration.getSettings().get("kafka").get("bootStrapServers");
+ private final String GROUP_ID_CONFIG = (String) Configuration.getSettings().get("kafka").get("group_id");
+ private final String KEY_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("key_deserialize_class");
+ private final String VAL_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("value_deserialize_class");
+ private final String KAFKA_TOPIC = (String) Configuration.getSettings().get("kafka").get("topic");
+
+ private final String HDFS_URL= (String) Configuration.getSettings().get("hdfs").get("hdfsURL");
+ private final String HDFS_REMOTE_FILE = (String) Configuration.getSettings().get("hdfs").get("hdfs_remote_file");
+
+ private KafkaConsumer<String, String> kafkaConsumer;
+ private Properties properties = new Properties();
+ private HdfsWriter hdfsWriter;
+ private FileSystem hdfsFileSystem;
+
+
+
+ public CreateKafkaConsumer() throws IOException{
+ setKafkaProperties();
+ kafkaConsumer = new KafkaConsumer<>(properties);
+ kafkaConsumer.subscribe(Collections.singletonList(KAFKA_TOPIC));
+ hdfsWriter = new HdfsWriter();
+ hdfsFileSystem = hdfsWriter.createHdfsFileSystem(HDFS_URL);
+ log.info(":::Created kafkaConsumer:::");
+ }
+
+ private void setKafkaProperties(){
+
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VAL_DESERIALIZER);
+ log.info(":::Set kafka properties:::");
+ }
+
+
+ public void processKafkaMessage() throws IOException{
+ try{
+ while(true){
+ ConsumerRecords<String, String> recordsPerPartition = kafkaConsumer.poll(100000);
+ if(recordsPerPartition.isEmpty())
+ log.info(":::recordsPerPartition is NULL:::");
+ else
+ log.info(":::size of recordsPerPartition: "+recordsPerPartition.count()+" :::");
+
+ for(ConsumerRecord<String, String> record:recordsPerPartition){
+ log.info("Topic: "+record.topic());
+ log.info("partition: "+record.partition());
+ log.info("ReceivedKey: "+record.key()+" ReceivedValue: "+record.value());
+ FSDataOutputStream fsDataOutputStream = hdfsWriter.invokeHdfsWriter(hdfsFileSystem, HDFS_REMOTE_FILE);
+ hdfsWriter.writeMessageToHdfs(fsDataOutputStream, record.value());
+ fsDataOutputStream.close();
+ }
+
+ }
+ }
+
+ finally {
+ log.info(":::Closing kafkaConsumer:::");
+ kafkaConsumer.close();
+ }
+ }
+}
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java
new file mode 100644
index 00000000..cd5b6635
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java
@@ -0,0 +1,40 @@
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+public class HdfsWriter {
+
+ private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class);
+
+
+ public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException {
+ Configuration hdfsConfiguration = new Configuration();
+ FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration);
+ log.info(":::Created hdfsFileSystem:::");
+ return hdfsFileSystem;
+ }
+
+
+ public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException {
+ fsDataOutputStream.writeBytes(bytesFromKafka);
+ log.info(":::Wrote to HDFS:::");
+ }
+
+
+ public FSDataOutputStream invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException {
+ FSDataOutputStream fsDataOutputStream;
+ if(!hdfsFileSystem.exists(new Path("/"+hdfsFile)))
+ fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile));
+ else
+ fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile));
+ log.info(":::HDFSWriter invoked:::");
+ return fsDataOutputStream;
+ }
+
+}
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java
new file mode 100644
index 00000000..b4daf2d1
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java
@@ -0,0 +1,51 @@
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import config.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class Orchestrator {
+
+ private static Logger logger = LoggerFactory.getLogger(Orchestrator.class);
+
+ public void init(String configYamlFile){
+
+ parseConfigYaml(configYamlFile);
+ }
+
+ private void parseConfigYaml(String configYaml) {
+
+ URL fileUrl = getClass().getResource(configYaml);
+ if(fileUrl==null)
+ System.out.println("::: Config file missing!!! :::");
+
+ else{
+ Configuration conf = new Configuration();
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ String realConfigYaml = configYaml;
+
+ if (!realConfigYaml.startsWith("/")) {
+ realConfigYaml = "/" + configYaml;
+ }
+ Map<String, Object> configs;
+ try (InputStream is = getClass().getResourceAsStream(realConfigYaml)) {
+ TypeReference<HashMap<String, Object>> typeRef
+ = new TypeReference<HashMap<String, Object>>() {
+ };
+ configs = mapper.readValue(is, typeRef);
+ conf.init(configs);
+
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
+ }
+ }
+}
+
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java
new file mode 100644
index 00000000..c7de131b
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java
@@ -0,0 +1,38 @@
+package config;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Configuration{
+
+ private static Logger log = LoggerFactory.getLogger(Configuration.class);
+ private static Map<String, Map<String, Object>> settings;
+
+ public void init(Map<String, Object> yamlConfigs){
+ settings = new HashMap<>();
+
+ if(yamlConfigs!=null){
+ Iterator<String> keys = yamlConfigs.keySet().iterator();
+ while(keys.hasNext()){
+ String key = keys.next();
+
+ Object value = yamlConfigs.get(key);
+
+ if(value instanceof Map){
+ Map<String, Object> valueMap = (Map<String, Object>) value;
+ settings.put(key, valueMap);
+ }
+ }
+ }
+ log.info(":::Settings initiated :::");
+ }
+
+ public static Map<String, Map<String, Object>> getSettings() {
+ return settings;
+ }
+} \ No newline at end of file
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java
new file mode 100644
index 00000000..5c041134
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java
@@ -0,0 +1,14 @@
+import java.io.IOException;
+
+public class kafka2hdfsApp {
+
+ public static void main(String[] args) throws IOException {
+ System.out.println("Begin::: kafka2hdfsApp");
+ Orchestrator orchestrator = new Orchestrator();
+ orchestrator.init(args[1]);
+
+ CreateKafkaConsumer createKafkaConsumer = new CreateKafkaConsumer();
+ createKafkaConsumer.processKafkaMessage();
+
+ }
+}
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml
new file mode 100644
index 00000000..8955c304
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml
@@ -0,0 +1,10 @@
+kafka:
+ bootStrapServers:
+ group_id:
+ key_deserialize_class:
+ value_deserialize_class:
+ topic:
+
+hdfs:
+ hdfsURL:
+ hdfs_remote_file: