diff options
author | Marco Platania <platania@research.att.com> | 2019-05-07 14:55:48 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-05-07 14:55:48 +0000 |
commit | 24d0c131ec95b43934dfb76ec397392ff851dbcf (patch) | |
tree | d5688147d9f67362bad366c37046417e8129c001 | |
parent | 0696abe27f2f8f2c19931a0d2c5ad0badaf5d236 (diff) | |
parent | b6555bc1f55e82a4436232ec28b2f5e50600d70d (diff) |
Merge "Source code initial workingDraft kafka2hdfs writer."
9 files changed, 363 insertions, 0 deletions
diff --git a/vnfs/DAaaS/.gitignore b/vnfs/DAaaS/.gitignore new file mode 100644 index 00000000..b13f7a20 --- /dev/null +++ b/vnfs/DAaaS/.gitignore @@ -0,0 +1,7 @@ +*.iml +.idea +target/ +dependency-reduced-pom.xml +File2hdfsApp.java +copyScript.sh +sample_configs.yaml diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md new file mode 100644 index 00000000..4de7d0f9 --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/README.md @@ -0,0 +1,11 @@ +# HDFS-writer + +HDFS writer can read from a message from kafka topic and persist that in the +HDFS file system given. This is a work in progress and shall be moved +to separate source code repo later. + +## Usage + +## Config items + +## Troubleshooting
\ No newline at end of file diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml new file mode 100644 index 00000000..20c11fea --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/pom.xml @@ -0,0 +1,111 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.intel.onap</groupId> + <artifactId>hdfs-writer</artifactId> + <version>1.0</version> + + <!--Begin: compile and build the fat jar --> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.8.1</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.3</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + </transformers> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>kafka2hdfsApp</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> <!-- bind to the packaging phase --> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + <!--End: compile and build the fat jar --> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <version>1.2.1</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>3.2.0</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>2.7.1</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>2.2.0</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + <version>2.9.8</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.2.3</version> + </dependency> + + </dependencies> + +</project>
\ No newline at end of file diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java new file mode 100644 index 00000000..2042a146 --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java @@ -0,0 +1,81 @@ +import config.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; + +public class CreateKafkaConsumer { + + + private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class); + + private final String BOOTSTRAP_SERVERS = (String) Configuration.getSettings().get("kafka").get("bootStrapServers"); + private final String GROUP_ID_CONFIG = (String) Configuration.getSettings().get("kafka").get("group_id"); + private final String KEY_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("key_deserialize_class"); + private final String VAL_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("value_deserialize_class"); + private final String KAFKA_TOPIC = (String) Configuration.getSettings().get("kafka").get("topic"); + + private final String HDFS_URL= (String) Configuration.getSettings().get("hdfs").get("hdfsURL"); + private final String HDFS_REMOTE_FILE = (String) Configuration.getSettings().get("hdfs").get("hdfs_remote_file"); + + private KafkaConsumer<String, String> kafkaConsumer; + private Properties properties = new Properties(); + private HdfsWriter hdfsWriter; + private FileSystem hdfsFileSystem; + + + + public CreateKafkaConsumer() throws IOException{ + setKafkaProperties(); + kafkaConsumer = new KafkaConsumer<>(properties); + kafkaConsumer.subscribe(Collections.singletonList(KAFKA_TOPIC)); + hdfsWriter = new HdfsWriter(); + hdfsFileSystem = hdfsWriter.createHdfsFileSystem(HDFS_URL); + log.info(":::Created kafkaConsumer:::"); + } + + private void setKafkaProperties(){ + + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VAL_DESERIALIZER); + log.info(":::Set kafka properties:::"); + } + + + public void processKafkaMessage() throws IOException{ + try{ + while(true){ + ConsumerRecords<String, String> recordsPerPartition = kafkaConsumer.poll(100000); + if(recordsPerPartition.isEmpty()) + log.info(":::recordsPerPartition is NULL:::"); + else + log.info(":::size of recordsPerPartition: "+recordsPerPartition.count()+" :::"); + + for(ConsumerRecord<String, String> record:recordsPerPartition){ + log.info("Topic: "+record.topic()); + log.info("partition: "+record.partition()); + log.info("ReceivedKey: "+record.key()+" ReceivedValue: "+record.value()); + FSDataOutputStream fsDataOutputStream = hdfsWriter.invokeHdfsWriter(hdfsFileSystem, HDFS_REMOTE_FILE); + hdfsWriter.writeMessageToHdfs(fsDataOutputStream, record.value()); + fsDataOutputStream.close(); + } + + } + } + + finally { + log.info(":::Closing kafkaConsumer:::"); + kafkaConsumer.close(); + } + } +} diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java new file mode 100644 index 00000000..cd5b6635 --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java @@ -0,0 +1,40 @@ +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +public class HdfsWriter { + + private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class); + + + public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException { + Configuration hdfsConfiguration = new Configuration(); + FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration); + log.info(":::Created hdfsFileSystem:::"); + return hdfsFileSystem; + } + + + public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException { + fsDataOutputStream.writeBytes(bytesFromKafka); + log.info(":::Wrote to HDFS:::"); + } + + + public FSDataOutputStream invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException { + FSDataOutputStream fsDataOutputStream; + if(!hdfsFileSystem.exists(new Path("/"+hdfsFile))) + fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile)); + else + fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile)); + log.info(":::HDFSWriter invoked:::"); + return fsDataOutputStream; + } + +} diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java new file mode 100644 index 00000000..b4daf2d1 --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java @@ -0,0 +1,51 @@ +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import config.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + + +public class Orchestrator { + + private static Logger logger = LoggerFactory.getLogger(Orchestrator.class); + + public void init(String configYamlFile){ + + parseConfigYaml(configYamlFile); + } + + private void parseConfigYaml(String configYaml) { + + URL fileUrl = getClass().getResource(configYaml); + if(fileUrl==null) + System.out.println("::: Config file missing!!! :::"); + + else{ + Configuration conf = new Configuration(); + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + String realConfigYaml = configYaml; + + if (!realConfigYaml.startsWith("/")) { + realConfigYaml = "/" + configYaml; + } + Map<String, Object> configs; + try (InputStream is = getClass().getResourceAsStream(realConfigYaml)) { + TypeReference<HashMap<String, Object>> typeRef + = new TypeReference<HashMap<String, Object>>() { + }; + configs = mapper.readValue(is, typeRef); + conf.init(configs); + + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + } +} + diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java new file mode 100644 index 00000000..c7de131b --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java @@ -0,0 +1,38 @@ +package config; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class Configuration{ + + private static Logger log = LoggerFactory.getLogger(Configuration.class); + private static Map<String, Map<String, Object>> settings; + + public void init(Map<String, Object> yamlConfigs){ + settings = new HashMap<>(); + + if(yamlConfigs!=null){ + Iterator<String> keys = yamlConfigs.keySet().iterator(); + while(keys.hasNext()){ + String key = keys.next(); + + Object value = yamlConfigs.get(key); + + if(value instanceof Map){ + Map<String, Object> valueMap = (Map<String, Object>) value; + settings.put(key, valueMap); + } + } + } + log.info(":::Settings initiated :::"); + } + + public static Map<String, Map<String, Object>> getSettings() { + return settings; + } +}
\ No newline at end of file diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java new file mode 100644 index 00000000..5c041134 --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java @@ -0,0 +1,14 @@ +import java.io.IOException; + +public class kafka2hdfsApp { + + public static void main(String[] args) throws IOException { + System.out.println("Begin::: kafka2hdfsApp"); + Orchestrator orchestrator = new Orchestrator(); + orchestrator.init(args[1]); + + CreateKafkaConsumer createKafkaConsumer = new CreateKafkaConsumer(); + createKafkaConsumer.processKafkaMessage(); + + } +} diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml new file mode 100644 index 00000000..8955c304 --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml @@ -0,0 +1,10 @@ +kafka: + bootStrapServers: + group_id: + key_deserialize_class: + value_deserialize_class: + topic: + +hdfs: + hdfsURL: + hdfs_remote_file: |