summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-07-07 01:03:14 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-07-07 01:03:14 -0700
commitdb7b404ee53d8cb078e2f88046b0b0ac3d6f952d (patch)
tree376957bae4994fb96cf9d779d7d73aa99de260de /components/datalake-handler/feeder/src/main/java
parent02453c8110fb0ec1d6d76f7550ef4c12806e0d9f (diff)
Unit test
Issue-ID: DCAEGEN2-1468 Change-Id: Ib0a1bbfa02390a9093e0a8ac4da1ae3fe2c1cd11 Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java45
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java3
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java3
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java68
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java6
8 files changed, 50 insertions, 85 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
index 2a653d89..cfd2462b 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
@@ -142,11 +142,11 @@ public class Db {
if (this.getClass() != obj.getClass())
return false;
- return name.equals(((Db) obj).getName());
+ return id==((Db) obj).getId();
}
@Override
public int hashCode() {
- return name.hashCode();
+ return id;
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index c680e71b..13e0163e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -38,12 +38,6 @@ import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
-import org.onap.datalake.feeder.enumeration.DbTypeEnum;
-import org.onap.datalake.feeder.service.db.CouchbaseService;
-import org.onap.datalake.feeder.service.db.DbStoreService;
-import org.onap.datalake.feeder.service.db.ElasticsearchService;
-import org.onap.datalake.feeder.service.db.HdfsService;
-import org.onap.datalake.feeder.service.db.MongodbService;
import com.fasterxml.jackson.annotation.JsonBackReference;
@@ -129,13 +123,6 @@ public class Topic {
@Column(name = "`flatten_array_path`")
protected String flattenArrayPath;
- public Topic() {
- }
-/*
- public Topic(String name) {//TODO
- //this.name = name;
- }
-*/
public String getName() {
return topicName.getId();
}
@@ -147,33 +134,7 @@ public class Topic {
return 3650;//default to 10 years for safe
}
}
-/*
- public boolean supportHdfs() {
- return supportDb(DbTypeEnum.HDFS);
- }
-
- public boolean supportElasticsearch() {
- return supportDb(DbTypeEnum.ES);
- }
-
- public boolean supportCouchbase() {
- return supportDb(DbTypeEnum.CB);
- }
- public boolean supportDruid() {
- return supportDb(DbTypeEnum.DRUID);
- }
-
- public boolean supportMongoDB() {
- return supportDb(DbTypeEnum.MONGO);
- }
-
- private boolean supportDb(DbTypeEnum dbTypeEnum) {
- for(Db db : dbs) {
-
- }
- }
-*/
public DataFormat getDataFormat2() {
if (dataFormat != null) {
return DataFormat.fromString(dataFormat);
@@ -204,7 +165,7 @@ public class Topic {
//extract DB id from JSON attributes, support multiple attributes
public String getMessageId(JSONObject json) {
- String id = null;
+ String ret = null;
if (StringUtils.isNotBlank(messageIdPath)) {
String[] paths = messageIdPath.split(",");
@@ -216,10 +177,10 @@ public class Topic {
}
sb.append(json.query(paths[i]).toString());
}
- id = sb.toString();
+ ret = sb.toString();
}
- return id;
+ return ret;
}
public TopicConfig getTopicConfig() {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index 942526d2..a51103b7 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -70,12 +70,12 @@ public class TopicConfig {
if (this.getClass() != obj.getClass())
return false;
- return name.equals(((TopicConfig) obj).getName());
+ return id==((TopicConfig) obj).getId();
}
@Override
public int hashCode() {
- return name.hashCode();
+ return id;
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
index bd2d9715..f2ac5e94 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
@@ -73,7 +73,8 @@ public class CouchbaseService implements DbStoreService {
}
@PostConstruct
- private void init() {
+ @Override
+ public void init() {
// Initialize Couchbase Connection
try {
//this tunes the SDK (to customize connection timeout)
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java
index 5ea6e23e..c873c010 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java
@@ -34,4 +34,6 @@ import org.onap.datalake.feeder.domain.EffectiveTopic;
public interface DbStoreService {
void saveJsons(EffectiveTopic topic, List<JSONObject> jsons);
+
+ void init();
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
index 4dfcdd22..18b7e2fb 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
@@ -83,7 +83,8 @@ public class ElasticsearchService implements DbStoreService {
//ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
//Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
@PostConstruct
- private void init() {
+ @Override
+ public void init() {
String elasticsearchHost = elasticsearch.getHost();
// Initialize the Connection
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java
index ea0e77aa..1725ee41 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java
@@ -32,7 +32,6 @@ import java.util.Map;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -45,13 +44,11 @@ import org.onap.datalake.feeder.domain.EffectiveTopic;
import org.onap.datalake.feeder.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import lombok.Getter;
-import lombok.Setter;
/**
* Service to write data to HDFS
@@ -64,20 +61,18 @@ import lombok.Setter;
public class HdfsService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
-
+
private Db hdfs;
@Autowired
ApplicationConfiguration config;
FileSystem fileSystem;
- private boolean isReady = false;
private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new);
private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS"));
- @Setter
@Getter
private class Buffer {
long lastFlush;
@@ -107,20 +102,21 @@ public class HdfsService implements DbStoreService {
}
}
- public void addData(List<Pair<Long, String>> messages) {
- if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
- lastFlush = System.currentTimeMillis();
- }
-
- messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used
- }
-
+ /*
+ public void addData(List<Pair<Long, String>> messages) {
+ if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
+ lastFlush = System.currentTimeMillis();
+ }
+
+ messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used
+ }
+ */
public void addData2(List<JSONObject> messages) {
if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
lastFlush = System.currentTimeMillis();
}
- messages.stream().forEach(message -> data.add(message.toString()));
+ messages.stream().forEach(message -> data.add(message.toString()));
}
private void saveMessages(String topic, List<String> bufferList) throws IOException {
@@ -157,9 +153,10 @@ public class HdfsService implements DbStoreService {
public HdfsService(Db db) {
hdfs = db;
}
-
+
@PostConstruct
- private void init() {
+ @Override
+ public void init() {
// Initialize HDFS Connection
try {
//Get configuration of Hadoop system
@@ -179,10 +176,8 @@ public class HdfsService implements DbStoreService {
ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get();
hadoopShutdownHookManager.clearShutdownHooks();
- isReady = true;
} catch (Exception ex) {
log.error("error connection to HDFS.", ex);
- isReady = false;
}
}
@@ -212,22 +207,23 @@ public class HdfsService implements DbStoreService {
bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
}
- //used if raw data should be saved
- public void saveMessages(EffectiveTopic topic, List<Pair<Long, String>> messages) {
- String topicStr = topic.getName();
-
- Map<String, Buffer> bufferMap = bufferLocal.get();
- final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
-
- buffer.addData(messages);
-
- if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
- buffer.flush(topicStr);
- } else {
- log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
+ /*
+ //used if raw data should be saved
+ public void saveMessages(EffectiveTopic topic, List<Pair<Long, String>> messages) {
+ String topicStr = topic.getName();
+
+ Map<String, Buffer> bufferMap = bufferLocal.get();
+ final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
+
+ buffer.addData(messages);
+
+ if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
+ buffer.flush(topicStr);
+ } else {
+ log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
+ }
}
- }
-
+ */
@Override
public void saveJsons(EffectiveTopic topic, List<JSONObject> jsons) {
String topicStr = topic.getName();
@@ -242,7 +238,7 @@ public class HdfsService implements DbStoreService {
} else {
log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
}
-
+
}
-
+
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
index 5cc4070d..a044790e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
@@ -48,6 +48,7 @@ import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientOptions.Builder;
import com.mongodb.MongoCredential;
+import com.mongodb.MongoTimeoutException;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
@@ -84,7 +85,8 @@ public class MongodbService implements DbStoreService {
}
@PostConstruct
- private void init() {
+ @Override
+ public void init() {
String host = mongodb.getHost();
Integer port = mongodb.getPort();
@@ -172,6 +174,8 @@ public class MongodbService implements DbStoreService {
for (BulkWriteError bulkWriteError : bulkWriteErrors) {
log.error("Failed record: {}", bulkWriteError);
}
+ } catch (MongoTimeoutException e) {
+ log.error("saveJsons()", e);
}
log.debug("saved text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size());