diff options
Diffstat (limited to 'vnfs/DAaaS/deploy/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java')
-rw-r--r-- | vnfs/DAaaS/deploy/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/vnfs/DAaaS/deploy/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java b/vnfs/DAaaS/deploy/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java new file mode 100644 index 00000000..cd5b6635 --- /dev/null +++ b/vnfs/DAaaS/deploy/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java @@ -0,0 +1,40 @@ +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +public class HdfsWriter { + + private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class); + + + public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException { + Configuration hdfsConfiguration = new Configuration(); + FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration); + log.info(":::Created hdfsFileSystem:::"); + return hdfsFileSystem; + } + + + public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException { + fsDataOutputStream.writeBytes(bytesFromKafka); + log.info(":::Wrote to HDFS:::"); + } + + + public FSDataOutputStream invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException { + FSDataOutputStream fsDataOutputStream; + if(!hdfsFileSystem.exists(new Path("/"+hdfsFile))) + fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile)); + else + fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile)); + log.info(":::HDFSWriter invoked:::"); + return fsDataOutputStream; + } + +} |