diff options
author | 2019-07-07 01:03:14 -0700 | |
---|---|---|
committer | 2019-07-07 01:03:14 -0700 | |
commit | db7b404ee53d8cb078e2f88046b0b0ac3d6f952d (patch) | |
tree | 376957bae4994fb96cf9d779d7d73aa99de260de /components/datalake-handler/feeder/src/main | |
parent | 02453c8110fb0ec1d6d76f7550ef4c12806e0d9f (diff) |
Unit test
Issue-ID: DCAEGEN2-1468
Change-Id: Ib0a1bbfa02390a9093e0a8ac4da1ae3fe2c1cd11
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main')
8 files changed, 50 insertions, 85 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java index 2a653d89..cfd2462b 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java @@ -142,11 +142,11 @@ public class Db { if (this.getClass() != obj.getClass()) return false; - return name.equals(((Db) obj).getName()); + return id==((Db) obj).getId(); } @Override public int hashCode() { - return name.hashCode(); + return id; } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index c680e71b..13e0163e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -38,12 +38,6 @@ import org.apache.commons.lang3.StringUtils; import org.json.JSONObject; import org.onap.datalake.feeder.dto.TopicConfig; import org.onap.datalake.feeder.enumeration.DataFormat; -import org.onap.datalake.feeder.enumeration.DbTypeEnum; -import org.onap.datalake.feeder.service.db.CouchbaseService; -import org.onap.datalake.feeder.service.db.DbStoreService; -import org.onap.datalake.feeder.service.db.ElasticsearchService; -import org.onap.datalake.feeder.service.db.HdfsService; -import org.onap.datalake.feeder.service.db.MongodbService; import com.fasterxml.jackson.annotation.JsonBackReference; @@ -129,13 +123,6 @@ public class Topic { @Column(name = "`flatten_array_path`") protected String flattenArrayPath; - public Topic() { - } -/* - public Topic(String name) {//TODO - //this.name = name; - } -*/ public String getName() { return topicName.getId(); } @@ -147,33 +134,7 @@ public class Topic { return 3650;//default to 10 years for safe } } -/* - public boolean supportHdfs() { - return supportDb(DbTypeEnum.HDFS); - } - - public boolean supportElasticsearch() { - return supportDb(DbTypeEnum.ES); - } - - public boolean supportCouchbase() { - return supportDb(DbTypeEnum.CB); - } - public boolean supportDruid() { - return supportDb(DbTypeEnum.DRUID); - } - - public boolean supportMongoDB() { - return supportDb(DbTypeEnum.MONGO); - } - - private boolean supportDb(DbTypeEnum dbTypeEnum) { - for(Db db : dbs) { - - } - } -*/ public DataFormat getDataFormat2() { if (dataFormat != null) { return DataFormat.fromString(dataFormat); @@ -204,7 +165,7 @@ public class Topic { //extract DB id from JSON attributes, support multiple attributes public String getMessageId(JSONObject json) { - String id = null; + String ret = null; if (StringUtils.isNotBlank(messageIdPath)) { String[] paths = messageIdPath.split(","); @@ -216,10 +177,10 @@ public class Topic { } sb.append(json.query(paths[i]).toString()); } - id = sb.toString(); + ret = sb.toString(); } - return id; + return ret; } public TopicConfig getTopicConfig() { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java index 942526d2..a51103b7 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java @@ -70,12 +70,12 @@ public class TopicConfig { if (this.getClass() != obj.getClass()) return false; - return name.equals(((TopicConfig) obj).getName()); + return id==((TopicConfig) obj).getId(); } @Override public int hashCode() { - return name.hashCode(); + return id; } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java index bd2d9715..f2ac5e94 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java @@ -73,7 +73,8 @@ public class CouchbaseService implements DbStoreService { } @PostConstruct - private void init() { + @Override + public void init() { // Initialize Couchbase Connection try { //this tunes the SDK (to customize connection timeout) diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java index 5ea6e23e..c873c010 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java @@ -34,4 +34,6 @@ import org.onap.datalake.feeder.domain.EffectiveTopic; public interface DbStoreService { void saveJsons(EffectiveTopic topic, List<JSONObject> jsons); + + void init(); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java index 4dfcdd22..18b7e2fb 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java @@ -83,7 +83,8 @@ public class ElasticsearchService implements DbStoreService { //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html @PostConstruct - private void init() { + @Override + public void init() { String elasticsearchHost = elasticsearch.getHost(); // Initialize the Connection diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java index ea0e77aa..1725ee41 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java @@ -32,7 +32,6 @@ import java.util.Map; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -45,13 +44,11 @@ import org.onap.datalake.feeder.domain.EffectiveTopic; import org.onap.datalake.feeder.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Service; import lombok.Getter; -import lombok.Setter; /** * Service to write data to HDFS @@ -64,20 +61,18 @@ import lombok.Setter; public class HdfsService implements DbStoreService { private final Logger log = LoggerFactory.getLogger(this.getClass()); - + private Db hdfs; @Autowired ApplicationConfiguration config; FileSystem fileSystem; - private boolean isReady = false; private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new); private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd")); private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS")); - @Setter @Getter private class Buffer { long lastFlush; @@ -107,20 +102,21 @@ public class HdfsService implements DbStoreService { } } - public void addData(List<Pair<Long, String>> messages) { - if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer - lastFlush = System.currentTimeMillis(); - } - - messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used - } - + /* + public void addData(List<Pair<Long, String>> messages) { + if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer + lastFlush = System.currentTimeMillis(); + } + + messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used + } + */ public void addData2(List<JSONObject> messages) { if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer lastFlush = System.currentTimeMillis(); } - messages.stream().forEach(message -> data.add(message.toString())); + messages.stream().forEach(message -> data.add(message.toString())); } private void saveMessages(String topic, List<String> bufferList) throws IOException { @@ -157,9 +153,10 @@ public class HdfsService implements DbStoreService { public HdfsService(Db db) { hdfs = db; } - + @PostConstruct - private void init() { + @Override + public void init() { // Initialize HDFS Connection try { //Get configuration of Hadoop system @@ -179,10 +176,8 @@ public class HdfsService implements DbStoreService { ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get(); hadoopShutdownHookManager.clearShutdownHooks(); - isReady = true; } catch (Exception ex) { log.error("error connection to HDFS.", ex); - isReady = false; } } @@ -212,22 +207,23 @@ public class HdfsService implements DbStoreService { bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic)); } - //used if raw data should be saved - public void saveMessages(EffectiveTopic topic, List<Pair<Long, String>> messages) { - String topicStr = topic.getName(); - - Map<String, Buffer> bufferMap = bufferLocal.get(); - final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer()); - - buffer.addData(messages); - - if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) { - buffer.flush(topicStr); - } else { - log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize()); + /* + //used if raw data should be saved + public void saveMessages(EffectiveTopic topic, List<Pair<Long, String>> messages) { + String topicStr = topic.getName(); + + Map<String, Buffer> bufferMap = bufferLocal.get(); + final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer()); + + buffer.addData(messages); + + if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) { + buffer.flush(topicStr); + } else { + log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize()); + } } - } - + */ @Override public void saveJsons(EffectiveTopic topic, List<JSONObject> jsons) { String topicStr = topic.getName(); @@ -242,7 +238,7 @@ public class HdfsService implements DbStoreService { } else { log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize()); } - + } - + } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java index 5cc4070d..a044790e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java @@ -48,6 +48,7 @@ import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; import com.mongodb.MongoClientOptions.Builder; import com.mongodb.MongoCredential; +import com.mongodb.MongoTimeoutException; import com.mongodb.ServerAddress; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; @@ -84,7 +85,8 @@ public class MongodbService implements DbStoreService { } @PostConstruct - private void init() { + @Override + public void init() { String host = mongodb.getHost(); Integer port = mongodb.getPort(); @@ -172,6 +174,8 @@ public class MongodbService implements DbStoreService { for (BulkWriteError bulkWriteError : bulkWriteErrors) { log.error("Failed record: {}", bulkWriteError); } + } catch (MongoTimeoutException e) { + log.error("saveJsons()", e); } log.debug("saved text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size()); |