summaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java
blob: cd5b6635b950f0618b7072244a4a651d0d5a0fcf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;

public class HdfsWriter {

    private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class);


    public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException {
        Configuration hdfsConfiguration = new Configuration();
        FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration);
        log.info(":::Created hdfsFileSystem:::");
        return hdfsFileSystem;
    }


    public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException {
        fsDataOutputStream.writeBytes(bytesFromKafka);
        log.info(":::Wrote to HDFS:::");
    }


    public FSDataOutputStream  invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException {
        FSDataOutputStream fsDataOutputStream;
        if(!hdfsFileSystem.exists(new Path("/"+hdfsFile)))
            fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile));
        else
            fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile));
        log.info(":::HDFSWriter invoked:::");
        return fsDataOutputStream;
    }

}