diff options
author | Dileep Ranganathan <dileep.ranganathan@intel.com> | 2019-05-30 12:38:37 -0700 |
---|---|---|
committer | Dileep Ranganathan <dileep.ranganathan@intel.com> | 2019-05-30 21:11:52 +0000 |
commit | 3d5a3e06530c1250d48f7d838c619f3bfbcd019d (patch) | |
tree | 349e370c43ce7318b3f7eb7736345de6872cbef2 /vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main | |
parent | 31802660dfe74a8671ae29789f0018f0f887ea1a (diff) |
Refactor Distributed Analytics project structure
Modified the project structure to improve maintainability and to add future CI and
integration test support.
Change-Id: Id30bfb1f83f23785a6b5f99e81f42f752d59c0f8
Issue-ID: ONAPARC-280
Signed-off-by: Dileep Ranganathan <dileep.ranganathan@intel.com>
Diffstat (limited to 'vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main')
6 files changed, 0 insertions, 234 deletions
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java deleted file mode 100644 index 2042a146..00000000 --- a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/CreateKafkaConsumer.java +++ /dev/null @@ -1,81 +0,0 @@ -import config.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; - -public class CreateKafkaConsumer { - - - private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class); - - private final String BOOTSTRAP_SERVERS = (String) Configuration.getSettings().get("kafka").get("bootStrapServers"); - private final String GROUP_ID_CONFIG = (String) Configuration.getSettings().get("kafka").get("group_id"); - private final String KEY_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("key_deserialize_class"); - private final String VAL_DESERIALIZER = (String) Configuration.getSettings().get("kafka").get("value_deserialize_class"); - private final String KAFKA_TOPIC = (String) Configuration.getSettings().get("kafka").get("topic"); - - private final String HDFS_URL= (String) Configuration.getSettings().get("hdfs").get("hdfsURL"); - private final String HDFS_REMOTE_FILE = (String) Configuration.getSettings().get("hdfs").get("hdfs_remote_file"); - - private KafkaConsumer<String, String> kafkaConsumer; - private Properties properties = new Properties(); - private HdfsWriter hdfsWriter; - private FileSystem hdfsFileSystem; - - - - public CreateKafkaConsumer() throws IOException{ - setKafkaProperties(); - kafkaConsumer = new KafkaConsumer<>(properties); - kafkaConsumer.subscribe(Collections.singletonList(KAFKA_TOPIC)); - hdfsWriter = new HdfsWriter(); - hdfsFileSystem = hdfsWriter.createHdfsFileSystem(HDFS_URL); - log.info(":::Created kafkaConsumer:::"); - } - - private void setKafkaProperties(){ - - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); - properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG); - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER); - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VAL_DESERIALIZER); - log.info(":::Set kafka properties:::"); - } - - - public void processKafkaMessage() throws IOException{ - try{ - while(true){ - ConsumerRecords<String, String> recordsPerPartition = kafkaConsumer.poll(100000); - if(recordsPerPartition.isEmpty()) - log.info(":::recordsPerPartition is NULL:::"); - else - log.info(":::size of recordsPerPartition: "+recordsPerPartition.count()+" :::"); - - for(ConsumerRecord<String, String> record:recordsPerPartition){ - log.info("Topic: "+record.topic()); - log.info("partition: "+record.partition()); - log.info("ReceivedKey: "+record.key()+" ReceivedValue: "+record.value()); - FSDataOutputStream fsDataOutputStream = hdfsWriter.invokeHdfsWriter(hdfsFileSystem, HDFS_REMOTE_FILE); - hdfsWriter.writeMessageToHdfs(fsDataOutputStream, record.value()); - fsDataOutputStream.close(); - } - - } - } - - finally { - log.info(":::Closing kafkaConsumer:::"); - kafkaConsumer.close(); - } - } -} diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java deleted file mode 100644 index cd5b6635..00000000 --- a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java +++ /dev/null @@ -1,40 +0,0 @@ -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; - -public class HdfsWriter { - - private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class); - - - public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException { - Configuration hdfsConfiguration = new Configuration(); - FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration); - log.info(":::Created hdfsFileSystem:::"); - return hdfsFileSystem; - } - - - public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException { - fsDataOutputStream.writeBytes(bytesFromKafka); - log.info(":::Wrote to HDFS:::"); - } - - - public FSDataOutputStream invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException { - FSDataOutputStream fsDataOutputStream; - if(!hdfsFileSystem.exists(new Path("/"+hdfsFile))) - fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile)); - else - fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile)); - log.info(":::HDFSWriter invoked:::"); - return fsDataOutputStream; - } - -} diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java deleted file mode 100644 index b4daf2d1..00000000 --- a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/Orchestrator.java +++ /dev/null @@ -1,51 +0,0 @@ -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import config.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.InputStream; -import java.net.URL; -import java.util.HashMap; -import java.util.Map; - - -public class Orchestrator { - - private static Logger logger = LoggerFactory.getLogger(Orchestrator.class); - - public void init(String configYamlFile){ - - parseConfigYaml(configYamlFile); - } - - private void parseConfigYaml(String configYaml) { - - URL fileUrl = getClass().getResource(configYaml); - if(fileUrl==null) - System.out.println("::: Config file missing!!! :::"); - - else{ - Configuration conf = new Configuration(); - ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - String realConfigYaml = configYaml; - - if (!realConfigYaml.startsWith("/")) { - realConfigYaml = "/" + configYaml; - } - Map<String, Object> configs; - try (InputStream is = getClass().getResourceAsStream(realConfigYaml)) { - TypeReference<HashMap<String, Object>> typeRef - = new TypeReference<HashMap<String, Object>>() { - }; - configs = mapper.readValue(is, typeRef); - conf.init(configs); - - } catch (Exception e) { - logger.error(e.getMessage()); - } - } - } -} - diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java deleted file mode 100644 index c7de131b..00000000 --- a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/config/Configuration.java +++ /dev/null @@ -1,38 +0,0 @@ -package config; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class Configuration{ - - private static Logger log = LoggerFactory.getLogger(Configuration.class); - private static Map<String, Map<String, Object>> settings; - - public void init(Map<String, Object> yamlConfigs){ - settings = new HashMap<>(); - - if(yamlConfigs!=null){ - Iterator<String> keys = yamlConfigs.keySet().iterator(); - while(keys.hasNext()){ - String key = keys.next(); - - Object value = yamlConfigs.get(key); - - if(value instanceof Map){ - Map<String, Object> valueMap = (Map<String, Object>) value; - settings.put(key, valueMap); - } - } - } - log.info(":::Settings initiated :::"); - } - - public static Map<String, Map<String, Object>> getSettings() { - return settings; - } -}
\ No newline at end of file diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java deleted file mode 100644 index 5c041134..00000000 --- a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/kafka2hdfsApp.java +++ /dev/null @@ -1,14 +0,0 @@ -import java.io.IOException; - -public class kafka2hdfsApp { - - public static void main(String[] args) throws IOException { - System.out.println("Begin::: kafka2hdfsApp"); - Orchestrator orchestrator = new Orchestrator(); - orchestrator.init(args[1]); - - CreateKafkaConsumer createKafkaConsumer = new CreateKafkaConsumer(); - createKafkaConsumer.processKafkaMessage(); - - } -} diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml deleted file mode 100644 index 8955c304..00000000 --- a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/resources/configs.yaml +++ /dev/null @@ -1,10 +0,0 @@ -kafka: - bootStrapServers: - group_id: - key_deserialize_class: - value_deserialize_class: - topic: - -hdfs: - hdfsURL: - hdfs_remote_file: |