aboutsummaryrefslogtreecommitdiffstats
path: root/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main
diff options
context:
space:
mode:
authorDileep Ranganathan <dileep.ranganathan@intel.com>2019-05-30 12:38:37 -0700
committerDileep Ranganathan <dileep.ranganathan@intel.com>2019-05-30 21:11:52 +0000
commit3d5a3e06530c1250d48f7d838c619f3bfbcd019d (patch)
tree349e370c43ce7318b3f7eb7736345de6872cbef2 /vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main
parent31802660dfe74a8671ae29789f0018f0f887ea1a (diff)
Refactor Distributed Analytics project structure
Modified the project structure to improve maintainability and to add future CI and integration test support. Change-Id: Id30bfb1f83f23785a6b5f99e81f42f752d59c0f8 Issue-ID: ONAPARC-280 Signed-off-by: Dileep Ranganathan <dileep.ranganathan@intel.com>
Diffstat (limited to 'vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main')
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java81
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java40
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java51
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java38
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java14
-rw-r--r--vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml10
6 files changed, 0 insertions, 234 deletions
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java
deleted file mode 100644
index 2042a146..00000000
--- a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-import config.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-public class CreateKafkaConsumer {
-
-
- private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class);
-
- private final String BOOTSTRAP_SERVERS = (String) Configuration.getSettings().get("kafka").get("bootStrapServers");
- private final String GROUP_ID_CONFIG = (String) Configuration.getSettings().get("kafka").get("group_id");
- private final String KEY_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("key_deserialize_class");
- private final String VAL_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("value_deserialize_class");
- private final String KAFKA_TOPIC = (String) Configuration.getSettings().get("kafka").get("topic");
-
- private final String HDFS_URL= (String) Configuration.getSettings().get("hdfs").get("hdfsURL");
- private final String HDFS_REMOTE_FILE = (String) Configuration.getSettings().get("hdfs").get("hdfs_remote_file");
-
- private KafkaConsumer<String, String> kafkaConsumer;
- private Properties properties = new Properties();
- private HdfsWriter hdfsWriter;
- private FileSystem hdfsFileSystem;
-
-
-
- public CreateKafkaConsumer() throws IOException{
- setKafkaProperties();
- kafkaConsumer = new KafkaConsumer<>(properties);
- kafkaConsumer.subscribe(Collections.singletonList(KAFKA_TOPIC));
- hdfsWriter = new HdfsWriter();
- hdfsFileSystem = hdfsWriter.createHdfsFileSystem(HDFS_URL);
- log.info(":::Created kafkaConsumer:::");
- }
-
- private void setKafkaProperties(){
-
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VAL_DESERIALIZER);
- log.info(":::Set kafka properties:::");
- }
-
-
- public void processKafkaMessage() throws IOException{
- try{
- while(true){
- ConsumerRecords<String, String> recordsPerPartition = kafkaConsumer.poll(100000);
- if(recordsPerPartition.isEmpty())
- log.info(":::recordsPerPartition is NULL:::");
- else
- log.info(":::size of recordsPerPartition: "+recordsPerPartition.count()+" :::");
-
- for(ConsumerRecord<String, String> record:recordsPerPartition){
- log.info("Topic: "+record.topic());
- log.info("partition: "+record.partition());
- log.info("ReceivedKey: "+record.key()+" ReceivedValue: "+record.value());
- FSDataOutputStream fsDataOutputStream = hdfsWriter.invokeHdfsWriter(hdfsFileSystem, HDFS_REMOTE_FILE);
- hdfsWriter.writeMessageToHdfs(fsDataOutputStream, record.value());
- fsDataOutputStream.close();
- }
-
- }
- }
-
- finally {
- log.info(":::Closing kafkaConsumer:::");
- kafkaConsumer.close();
- }
- }
-}
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java
deleted file mode 100644
index cd5b6635..00000000
--- a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-
-public class HdfsWriter {
-
- private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class);
-
-
- public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException {
- Configuration hdfsConfiguration = new Configuration();
- FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration);
- log.info(":::Created hdfsFileSystem:::");
- return hdfsFileSystem;
- }
-
-
- public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException {
- fsDataOutputStream.writeBytes(bytesFromKafka);
- log.info(":::Wrote to HDFS:::");
- }
-
-
- public FSDataOutputStream invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException {
- FSDataOutputStream fsDataOutputStream;
- if(!hdfsFileSystem.exists(new Path("/"+hdfsFile)))
- fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile));
- else
- fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile));
- log.info(":::HDFSWriter invoked:::");
- return fsDataOutputStream;
- }
-
-}
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java
deleted file mode 100644
index b4daf2d1..00000000
--- a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import config.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.Map;
-
-
-public class Orchestrator {
-
- private static Logger logger = LoggerFactory.getLogger(Orchestrator.class);
-
- public void init(String configYamlFile){
-
- parseConfigYaml(configYamlFile);
- }
-
- private void parseConfigYaml(String configYaml) {
-
- URL fileUrl = getClass().getResource(configYaml);
- if(fileUrl==null)
- System.out.println("::: Config file missing!!! :::");
-
- else{
- Configuration conf = new Configuration();
- ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
- String realConfigYaml = configYaml;
-
- if (!realConfigYaml.startsWith("/")) {
- realConfigYaml = "/" + configYaml;
- }
- Map<String, Object> configs;
- try (InputStream is = getClass().getResourceAsStream(realConfigYaml)) {
- TypeReference<HashMap<String, Object>> typeRef
- = new TypeReference<HashMap<String, Object>>() {
- };
- configs = mapper.readValue(is, typeRef);
- conf.init(configs);
-
- } catch (Exception e) {
- logger.error(e.getMessage());
- }
- }
- }
-}
-
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java
deleted file mode 100644
index c7de131b..00000000
--- a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package config;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class Configuration{
-
- private static Logger log = LoggerFactory.getLogger(Configuration.class);
- private static Map<String, Map<String, Object>> settings;
-
- public void init(Map<String, Object> yamlConfigs){
- settings = new HashMap<>();
-
- if(yamlConfigs!=null){
- Iterator<String> keys = yamlConfigs.keySet().iterator();
- while(keys.hasNext()){
- String key = keys.next();
-
- Object value = yamlConfigs.get(key);
-
- if(value instanceof Map){
- Map<String, Object> valueMap = (Map<String, Object>) value;
- settings.put(key, valueMap);
- }
- }
- }
- log.info(":::Settings initiated :::");
- }
-
- public static Map<String, Map<String, Object>> getSettings() {
- return settings;
- }
-} \ No newline at end of file
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java
deleted file mode 100644
index 5c041134..00000000
--- a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java
+++ /dev/null
@@ -1,14 +0,0 @@
-import java.io.IOException;
-
-public class kafka2hdfsApp {
-
- public static void main(String[] args) throws IOException {
- System.out.println("Begin::: kafka2hdfsApp");
- Orchestrator orchestrator = new Orchestrator();
- orchestrator.init(args[1]);
-
- CreateKafkaConsumer createKafkaConsumer = new CreateKafkaConsumer();
- createKafkaConsumer.processKafkaMessage();
-
- }
-}
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml
deleted file mode 100644
index 8955c304..00000000
--- a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml
+++ /dev/null
@@ -1,10 +0,0 @@
-kafka:
- bootStrapServers:
- group_id:
- key_deserialize_class:
- value_deserialize_class:
- topic:
-
-hdfs:
- hdfsURL:
- hdfs_remote_file: