blob: cd5b6635b950f0618b7072244a4a651d0d5a0fcf (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
public class HdfsWriter {
private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class);
public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException {
Configuration hdfsConfiguration = new Configuration();
FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration);
log.info(":::Created hdfsFileSystem:::");
return hdfsFileSystem;
}
public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException {
fsDataOutputStream.writeBytes(bytesFromKafka);
log.info(":::Wrote to HDFS:::");
}
public FSDataOutputStream invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException {
FSDataOutputStream fsDataOutputStream;
if(!hdfsFileSystem.exists(new Path("/"+hdfsFile)))
fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile));
else
fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile));
log.info(":::HDFSWriter invoked:::");
return fsDataOutputStream;
}
}
|