summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarco Platania <platania@research.att.com>2019-05-07 14:55:48 +0000
committerGerrit Code Review <gerrit@onap.org>2019-05-07 14:55:48 +0000
commit24d0c131ec95b43934dfb76ec397392ff851dbcf (patch)
treed5688147d9f67362bad366c37046417e8129c001
parent0696abe27f2f8f2c19931a0d2c5ad0badaf5d236 (diff)
parentb6555bc1f55e82a4436232ec28b2f5e50600d70d (diff)
Merge "Source code initial workingDraft kafka2hdfs writer."
-rw-r--r--vnfs/DAaaS/.gitignore7
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md11
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml111
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java81
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java40
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java51
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java38
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java14
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml10
9 files changed, 363 insertions, 0 deletions
diff --git a/vnfs/DAaaS/.gitignore b/vnfs/DAaaS/.gitignore
new file mode 100644
index 00000000..b13f7a20
--- /dev/null
+++ b/vnfs/DAaaS/.gitignore
@@ -0,0 +1,7 @@
+*.iml
+.idea
+target/
+dependency-reduced-pom.xml
+File2hdfsApp.java
+copyScript.sh
+sample_configs.yaml
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md
new file mode 100644
index 00000000..4de7d0f9
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md
@@ -0,0 +1,11 @@
+# HDFS-writer
+
+HDFS writer can read from a message from kafka topic and persist that in the
+HDFS file system given. This is a work in progress and shall be moved
+to separate source code repo later.
+
+## Usage
+
+## Config items
+
+## Troubleshooting \ No newline at end of file
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml
new file mode 100644
index 00000000..20c11fea
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.intel.onap</groupId>
+ <artifactId>hdfs-writer</artifactId>
+ <version>1.0</version>
+
+ <!--Begin: compile and build the fat jar -->
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ </transformers>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>kafka2hdfsApp</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase> <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <!--End: compile and build the fat jar -->
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>1.2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>3.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>2.7.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>2.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ <version>2.9.8</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.2.3</version>
+ </dependency>
+
+ </dependencies>
+
+</project> \ No newline at end of file
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java
new file mode 100644
index 00000000..2042a146
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java
@@ -0,0 +1,81 @@
+import config.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class CreateKafkaConsumer {
+
+
+ private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class);
+
+ private final String BOOTSTRAP_SERVERS = (String) Configuration.getSettings().get("kafka").get("bootStrapServers");
+ private final String GROUP_ID_CONFIG = (String) Configuration.getSettings().get("kafka").get("group_id");
+ private final String KEY_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("key_deserialize_class");
+ private final String VAL_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("value_deserialize_class");
+ private final String KAFKA_TOPIC = (String) Configuration.getSettings().get("kafka").get("topic");
+
+ private final String HDFS_URL= (String) Configuration.getSettings().get("hdfs").get("hdfsURL");
+ private final String HDFS_REMOTE_FILE = (String) Configuration.getSettings().get("hdfs").get("hdfs_remote_file");
+
+ private KafkaConsumer<String, String> kafkaConsumer;
+ private Properties properties = new Properties();
+ private HdfsWriter hdfsWriter;
+ private FileSystem hdfsFileSystem;
+
+
+
+ public CreateKafkaConsumer() throws IOException{
+ setKafkaProperties();
+ kafkaConsumer = new KafkaConsumer<>(properties);
+ kafkaConsumer.subscribe(Collections.singletonList(KAFKA_TOPIC));
+ hdfsWriter = new HdfsWriter();
+ hdfsFileSystem = hdfsWriter.createHdfsFileSystem(HDFS_URL);
+ log.info(":::Created kafkaConsumer:::");
+ }
+
+ private void setKafkaProperties(){
+
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VAL_DESERIALIZER);
+ log.info(":::Set kafka properties:::");
+ }
+
+
+ public void processKafkaMessage() throws IOException{
+ try{
+ while(true){
+ ConsumerRecords<String, String> recordsPerPartition = kafkaConsumer.poll(100000);
+ if(recordsPerPartition.isEmpty())
+ log.info(":::recordsPerPartition is NULL:::");
+ else
+ log.info(":::size of recordsPerPartition: "+recordsPerPartition.count()+" :::");
+
+ for(ConsumerRecord<String, String> record:recordsPerPartition){
+ log.info("Topic: "+record.topic());
+ log.info("partition: "+record.partition());
+ log.info("ReceivedKey: "+record.key()+" ReceivedValue: "+record.value());
+ FSDataOutputStream fsDataOutputStream = hdfsWriter.invokeHdfsWriter(hdfsFileSystem, HDFS_REMOTE_FILE);
+ hdfsWriter.writeMessageToHdfs(fsDataOutputStream, record.value());
+ fsDataOutputStream.close();
+ }
+
+ }
+ }
+
+ finally {
+ log.info(":::Closing kafkaConsumer:::");
+ kafkaConsumer.close();
+ }
+ }
+}
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java
new file mode 100644
index 00000000..cd5b6635
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java
@@ -0,0 +1,40 @@
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+public class HdfsWriter {
+
+ private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class);
+
+
+ public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException {
+ Configuration hdfsConfiguration = new Configuration();
+ FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration);
+ log.info(":::Created hdfsFileSystem:::");
+ return hdfsFileSystem;
+ }
+
+
+ public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException {
+ fsDataOutputStream.writeBytes(bytesFromKafka);
+ log.info(":::Wrote to HDFS:::");
+ }
+
+
+ public FSDataOutputStream invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException {
+ FSDataOutputStream fsDataOutputStream;
+ if(!hdfsFileSystem.exists(new Path("/"+hdfsFile)))
+ fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile));
+ else
+ fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile));
+ log.info(":::HDFSWriter invoked:::");
+ return fsDataOutputStream;
+ }
+
+}
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java
new file mode 100644
index 00000000..b4daf2d1
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java
@@ -0,0 +1,51 @@
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import config.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class Orchestrator {
+
+ private static Logger logger = LoggerFactory.getLogger(Orchestrator.class);
+
+ public void init(String configYamlFile){
+
+ parseConfigYaml(configYamlFile);
+ }
+
+ private void parseConfigYaml(String configYaml) {
+
+ URL fileUrl = getClass().getResource(configYaml);
+ if(fileUrl==null)
+ System.out.println("::: Config file missing!!! :::");
+
+ else{
+ Configuration conf = new Configuration();
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ String realConfigYaml = configYaml;
+
+ if (!realConfigYaml.startsWith("/")) {
+ realConfigYaml = "/" + configYaml;
+ }
+ Map<String, Object> configs;
+ try (InputStream is = getClass().getResourceAsStream(realConfigYaml)) {
+ TypeReference<HashMap<String, Object>> typeRef
+ = new TypeReference<HashMap<String, Object>>() {
+ };
+ configs = mapper.readValue(is, typeRef);
+ conf.init(configs);
+
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ }
+ }
+ }
+}
+
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java
new file mode 100644
index 00000000..c7de131b
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java
@@ -0,0 +1,38 @@
+package config;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Configuration{
+
+ private static Logger log = LoggerFactory.getLogger(Configuration.class);
+ private static Map<String, Map<String, Object>> settings;
+
+ public void init(Map<String, Object> yamlConfigs){
+ settings = new HashMap<>();
+
+ if(yamlConfigs!=null){
+ Iterator<String> keys = yamlConfigs.keySet().iterator();
+ while(keys.hasNext()){
+ String key = keys.next();
+
+ Object value = yamlConfigs.get(key);
+
+ if(value instanceof Map){
+ Map<String, Object> valueMap = (Map<String, Object>) value;
+ settings.put(key, valueMap);
+ }
+ }
+ }
+ log.info(":::Settings initiated :::");
+ }
+
+ public static Map<String, Map<String, Object>> getSettings() {
+ return settings;
+ }
+} \ No newline at end of file
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java
new file mode 100644
index 00000000..5c041134
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java
@@ -0,0 +1,14 @@
+import java.io.IOException;
+
+public class kafka2hdfsApp {
+
+ public static void main(String[] args) throws IOException {
+ System.out.println("Begin::: kafka2hdfsApp");
+ Orchestrator orchestrator = new Orchestrator();
+ orchestrator.init(args[1]);
+
+ CreateKafkaConsumer createKafkaConsumer = new CreateKafkaConsumer();
+ createKafkaConsumer.processKafkaMessage();
+
+ }
+}
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml
new file mode 100644
index 00000000..8955c304
--- /dev/null
+++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml
@@ -0,0 +1,10 @@
+kafka:
+ bootStrapServers:
+ group_id:
+ key_deserialize_class:
+ value_deserialize_class:
+ topic:
+
+hdfs:
+ hdfsURL:
+ hdfs_remote_file: