summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-04-29 17:05:39 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-04-29 20:59:56 -0700
commit2920426bd0f8369a178895138e97b0b19372c413 (patch)
tree6cc264e2db5608118b58e8d0cdca360ff6ae95f3
parentf411c14f0d8ee65008d83b4a8a7bd719310a587b (diff)
Use TopicConfig in backend4.0.0-ONAP1.0.0
Issue-ID: DCAEGEN2-1200 Change-Id: Ia18993524c272c42ba48485a636f04f94af0cd0a Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
-rw-r--r--components/datalake-handler/feeder/src/assembly/scripts/init_db.sql7
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java1
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java25
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java115
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java96
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java5
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java34
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java10
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java7
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java1
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java11
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java17
-rw-r--r--components/datalake-handler/feeder/src/main/resources/application.properties2
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java4
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java39
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java100
-rwxr-xr-xcomponents/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java4
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/ElasticsearchServiceTest.java2
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/MongodbServiceTest.java2
19 files changed, 269 insertions, 213 deletions
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
index 44f4ef17..e201242d 100644
--- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
+++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
@@ -11,10 +11,7 @@ CREATE TABLE `topic` (
`save_raw` bit(1) DEFAULT NULL,
`ttl` int(11) DEFAULT NULL,
`data_format` varchar(255) DEFAULT NULL,
- `default_topic` varchar(255) DEFAULT NULL,
- PRIMARY KEY (`name`),
- KEY `FK_default_topic` (`default_topic`),
- CONSTRAINT `FK_default_topic` FOREIGN KEY (`default_topic`) REFERENCES `topic` (`name`)
+ PRIMARY KEY (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
@@ -52,7 +49,7 @@ insert into db (`name`,`host`) values ('Druid','dl_druid');
-- in production, default enabled should be off
insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');
insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);
-insert into `topic`(`name`,correlate_cleared_message,`enabled`,default_topic, message_id_path) values ('unauthenticated.SEC_FAULT_OUTPUT',1,0,'_DL_DEFAULT_', '/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem');
+insert into `topic`(`name`,correlate_cleared_message,`enabled`, message_id_path) values ('unauthenticated.SEC_FAULT_OUTPUT',1,0,'/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFAULT_');
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index a3add0ed..d59c0fc1 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -45,6 +45,7 @@ public class ApplicationConfiguration {
private String dmaapKafkaHostPort;
private String dmaapKafkaGroup;
private long dmaapKafkaTimeout;
+ private String[] dmaapKafkaExclude;
private int dmaapCheckNewTopicIntervalInSec;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
index f08a994d..d3a1fce3 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -55,10 +55,10 @@ import io.swagger.annotations.ApiOperation;
/**
* This controller manages topic settings.
*
- * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's
- * enabled=null, _DL_DEFAULT_.enabled is used for that topic. All the settings
- * are saved in database. topic "_DL_DEFAULT_" is populated at setup by a DB
- * script.
+ * Topic "_DL_DEFAULT_" acts as the default.
+ * If a topic is not present in database, "_DL_DEFAULT_" is used for it.
+ * If a topic is present in database, itself should be complete, and no setting from "_DL_DEFAULT_" is used.
+ * Topic "_DL_DEFAULT_" is populated at setup by a DB script.
*
* @author Guobiao Mo
* @contributor Kate Hsuan @ QCT
@@ -88,7 +88,7 @@ public class TopicController {
@GetMapping("")
@ResponseBody
- @ApiOperation(value="List all topics in database")
+ @ApiOperation(value="List all topic names in database")
public List<String> list() {
Iterable<Topic> ret = topicRepository.findAll();
List<String> retString = new ArrayList<>();
@@ -114,13 +114,11 @@ public class TopicController {
sendError(response, 400, "Topic already exists "+topicConfig.getName());
return null;
} else {
- PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
Topic wTopic = topicService.fillTopicConfiguration(topicConfig);
if(wTopic.getTtl() == 0)
wTopic.setTtl(3650);
- topicRepository.save(wTopic);
- mkPostReturnBody(retBody, 200, wTopic);
- return retBody;
+ topicRepository.save(wTopic);
+ return mkPostReturnBody(200, wTopic);
}
}
@@ -159,18 +157,19 @@ public class TopicController {
sendError(response, 404, "Topic not found "+topicConfig.getName());
return null;
} else {
- PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
topicService.fillTopicConfiguration(topicConfig, oldTopic);
topicRepository.save(oldTopic);
- mkPostReturnBody(retBody, 200, oldTopic);
- return retBody;
+ return mkPostReturnBody(200, oldTopic);
}
}
- private void mkPostReturnBody(PostReturnBody<TopicConfig> retBody, int statusCode, Topic topic)
+ private PostReturnBody<TopicConfig> mkPostReturnBody(int statusCode, Topic topic)
{
+ PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
retBody.setStatusCode(statusCode);
retBody.setReturnBody(topic.getTopicConfig());
+
+ return retBody;
}
private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index 06c6b8cc..c618f57f 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -32,10 +32,7 @@ import javax.persistence.JoinTable;
import javax.persistence.ManyToMany;
import javax.persistence.Table;
-import org.apache.commons.lang3.StringUtils;
-import org.json.JSONObject;
import org.onap.datalake.feeder.dto.TopicConfig;
-import org.onap.datalake.feeder.enumeration.DataFormat;
import com.fasterxml.jackson.annotation.JsonBackReference;
@@ -54,11 +51,10 @@ import lombok.Setter;
@Table(name = "topic")
public class Topic {
@Id
- @Column(name="`name`")
+ @Column(name = "`name`")
private String name;//topic name
-
- //for protected Kafka topics
+ //for protected Kafka topics
@Column(name = "`login`")
private String login;
@@ -69,16 +65,13 @@ public class Topic {
@JsonBackReference
//@JsonManagedReference
@ManyToMany(fetch = FetchType.EAGER)
- @JoinTable( name = "map_db_topic",
- joinColumns = { @JoinColumn(name="topic_name") },
- inverseJoinColumns = { @JoinColumn(name="db_name") }
- )
+ @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_name") }, inverseJoinColumns = { @JoinColumn(name = "db_name") })
protected Set<Db> dbs;
/**
* indicate if we should monitor this topic
*/
- @Column(name="`enabled`")
+ @Column(name = "`enabled`")
private Boolean enabled;
/**
@@ -88,10 +81,10 @@ public class Topic {
private Boolean saveRaw;
/**
- * need to explicitly tell feeder the data format of the message.
- * support JSON, XML, YAML, TEXT
+ * need to explicitly tell feeder the data format of the message. support JSON,
+ * XML, YAML, TEXT
*/
- @Column(name="`data_format`")
+ @Column(name = "`data_format`")
private String dataFormat;
/**
@@ -114,22 +107,6 @@ public class Topic {
this.name = name;
}
- public Topic clone() { //TODO will use TopicConfig
- Topic ret = new Topic();
- ret.setCorrelateClearedMessage(correlateClearedMessage);
- ret.setDataFormat(dataFormat);
- ret.setDbs(dbs);
- ret.setEnabled(enabled);
- ret.setLogin(login);
- ret.setMessageIdPath(messageIdPath);
- ret.setName(name);
- ret.setPass(pass);
- ret.setSaveRaw(saveRaw);
- ret.setTtl(ttl);
-
- return ret;
- }
-
public boolean isDefault() {
return "_DL_DEFAULT_".equals(name);
}
@@ -145,19 +122,11 @@ public class Topic {
public int getTtl() {
if (ttl != null) {
return ttl;
- } else {
+ } else {
return 3650;//default to 10 years for safe
}
}
- public DataFormat getDataFormat() {
- if (dataFormat != null) {
- return DataFormat.fromString(dataFormat);
- } else {
- return null;
- }
- }
-
private boolean is(Boolean b) {
return is(b, false);
}
@@ -165,7 +134,7 @@ public class Topic {
private boolean is(Boolean b, boolean defaultValue) {
if (b != null) {
return b;
- } else {
+ } else {
return defaultValue;
}
}
@@ -174,71 +143,25 @@ public class Topic {
return is(saveRaw);
}
- public boolean supportElasticsearch() {
- return containDb("Elasticsearch");//TODO string hard codes
- }
-
- public boolean supportCouchbase() {
- return containDb("Couchbase");
- }
-
- public boolean supportDruid() {
- return containDb("Druid");
- }
-
- public boolean supportMongoDB() {
- return containDb("MongoDB");
- }
-
- private boolean containDb(String dbName) {
- Db db = new Db(dbName);
-
- if (dbs != null && dbs.contains(db)) {
- return true;
- } else {
- return false;
- }
- }
-
- //extract DB id from JSON attributes, support multiple attributes
- public String getMessageId(JSONObject json) {
- String id = null;
-
- if (StringUtils.isNotBlank(messageIdPath)) {
- String[] paths = messageIdPath.split(",");
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < paths.length; i++) {
- if (i > 0) {
- sb.append('^');
- }
- sb.append(json.query(paths[i]).toString());
- }
- id = sb.toString();
- }
-
- return id;
- }
-
public TopicConfig getTopicConfig() {
TopicConfig tConfig = new TopicConfig();
-
+
tConfig.setName(getName());
- tConfig.setEnable(getEnabled());
- if(getDataFormat() != null)
- tConfig.setDataFormat(getDataFormat().toString());
- tConfig.setSaveRaw(getSaveRaw());
- tConfig.setCorrelatedClearredMessage((getCorrelateClearedMessage() == null) ? getCorrelateClearedMessage() : false);
+ tConfig.setEnabled(isEnabled());
+ tConfig.setDataFormat(dataFormat);
+ tConfig.setSaveRaw(isSaveRaw());
+ tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage());
tConfig.setMessageIdPath(getMessageIdPath());
tConfig.setTtl(getTtl());
Set<Db> topicDb = getDbs();
List<String> dbList = new ArrayList<>();
- for(Db item: topicDb)
- {
- dbList.add(item.getName());
+ if (topicDb != null) {
+ for (Db item : topicDb) {
+ dbList.add(item.getName());
+ }
}
tConfig.setSinkdbs(dbList);
-
+
return tConfig;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index 76b41cb5..15ffc8a3 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -22,14 +22,12 @@ package org.onap.datalake.feeder.dto;
import lombok.Getter;
import lombok.Setter;
-import org.onap.datalake.feeder.domain.Topic;
-import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.repository.DbRepository;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONObject;
+import org.onap.datalake.feeder.enumeration.DataFormat;
/**
* JSON request body for Topic manipulation.
@@ -43,17 +41,85 @@ import java.util.Set;
public class TopicConfig {
- private String name;
- private String login;
- private String password;
- private List<String> sinkdbs;
- private boolean enable;
- private boolean saveRaw;
- private String dataFormat;
- private int ttl;
- private boolean correlatedClearredMessage;
- private String messageIdPath;
+ private String name;
+ private String login;
+ private String password;
+ private List<String> sinkdbs;
+ private boolean enabled;
+ private boolean saveRaw;
+ private String dataFormat;
+ private int ttl;
+ private boolean correlateClearedMessage;
+ private String messageIdPath;
+ public DataFormat getDataFormat2() {
+ if (dataFormat != null) {
+ return DataFormat.fromString(dataFormat);
+ } else {
+ return null;
+ }
+ }
+
+ public boolean supportElasticsearch() {
+ return containDb("Elasticsearch");//TODO string hard codes
+ }
+
+ public boolean supportCouchbase() {
+ return containDb("Couchbase");
+ }
+
+ public boolean supportDruid() {
+ return containDb("Druid");
+ }
+
+ public boolean supportMongoDB() {
+ return containDb("MongoDB");
+ }
+
+ private boolean containDb(String dbName) {
+ return (sinkdbs != null && sinkdbs.contains(dbName));
+ }
+
+ //extract DB id from JSON attributes, support multiple attributes
+ public String getMessageId(JSONObject json) {
+ String id = null;
+
+ if (StringUtils.isNotBlank(messageIdPath)) {
+ String[] paths = messageIdPath.split(",");
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < paths.length; i++) {
+ if (i > 0) {
+ sb.append('^');
+ }
+ sb.append(json.query(paths[i]).toString());
+ }
+ id = sb.toString();
+ }
+
+ return id;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (this.getClass() != obj.getClass())
+ return false;
+
+ return name.equals(((TopicConfig) obj).getName());
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
index 1e5fb78b..12d03ee6 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
@@ -30,6 +30,7 @@ import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +91,7 @@ public class CouchbaseService {
bucket.close();
}
- public void saveJsons(Topic topic, List<JSONObject> jsons) {
+ public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
List<JsonDocument> documents= new ArrayList<>(jsons.size());
for(JSONObject json : jsons) {
//convert to Couchbase JsonObject from org.json JSONObject
@@ -109,7 +110,7 @@ public class CouchbaseService {
log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size());
}
- public String getId(Topic topic, JSONObject json) {
+ public String getId(TopicConfig topic, JSONObject json) {
//if this topic requires extract id from JSON
String id = topic.getMessageId(json);
if(id != null) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
index 270db932..de8c9e89 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
@@ -25,13 +25,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import javax.annotation.PostConstruct;
-
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
-import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -54,12 +52,6 @@ public class DmaapService {
@Autowired
private TopicService topicService;
-
- @PostConstruct
- private void init() {
-
- }
-
//get all topic names from Zookeeper
public List<String> getTopics() {
try {
@@ -67,28 +59,32 @@ public class DmaapService {
@Override
public void process(WatchedEvent event) {
// TODO monitor new topics
-
+
}
- };
+ };
ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher);
- List<String> topics = zk.getChildren("/brokers/topics", false);
- return topics;
+ List<String> topics = zk.getChildren("/brokers/topics", false);
+ String[] excludes = config.getDmaapKafkaExclude();
+ for (String exclude : excludes) {
+ topics.remove(exclude);
+ }
+ return topics;
} catch (Exception e) {
log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e);
- return null;
+ return Collections.emptyList();
}
- }
+ }
- public List<String> getActiveTopics() throws IOException {
+ public List<String> getActiveTopics() throws IOException {
List<String> allTopics = getTopics();
- if(allTopics == null) {
+ if (allTopics == null) {
return Collections.emptyList();
}
List<String> ret = new ArrayList<>();
for (String topicStr : allTopics) {
- Topic topic = topicService.getEffectiveTopic(topicStr, true);
- if (topic.isEnabled()) {
+ TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true);
+ if (topicConfig.isEnabled()) {
ret.add(topicStr);
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index 30aa7332..4090e7eb 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -47,7 +47,7 @@ import org.elasticsearch.rest.RestStatus;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,6 +112,7 @@ public class ElasticsearchService {
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
if(!exists){
+ //TODO submit mapping template
CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged());
@@ -119,7 +120,7 @@ public class ElasticsearchService {
}
//TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
- public void saveJsons(Topic topic, List<JSONObject> jsons) {
+ public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
BulkRequest request = new BulkRequest();
for (JSONObject json : jsons) {
@@ -134,6 +135,9 @@ public class ElasticsearchService {
request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
}
+
+ log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size());
+
if(config.isAsync()) {
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
}else {
@@ -155,7 +159,7 @@ public class ElasticsearchService {
* The search API can only query all data or based on the fields in the source.
* So use the get API, three parameters: index, type, document id
*/
- private boolean correlateClearedMessage(Topic topic, JSONObject json) {
+ private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) {
boolean found = false;
String eName = null;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
index fb3f806c..02c80a45 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
@@ -34,7 +34,7 @@ import org.bson.Document;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +48,6 @@ import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
-import com.mongodb.DB;
/**
* Service for using MongoDB
@@ -131,7 +130,7 @@ public class MongodbService {
mongoClient.close();
}
- public void saveJsons(Topic topic, List<JSONObject> jsons) {
+ public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
if(dbReady == false)
return;
List<Document> documents = new ArrayList<>(jsons.size());
@@ -150,7 +149,7 @@ public class MongodbService {
MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k));
collection.insertMany(documents);
- log.debug("saved text to topic = {}, topic total count = {} ", topic, collection.countDocuments());
+ log.debug("saved text to topic = {}, batch count = {} ", topic, jsons.size());
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
index ce671a90..b3a6d29a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
@@ -82,6 +82,7 @@ public class PullThread implements Runnable {
private void init() {
async = config.isAsync();
Properties consumerConfig = getConsumerConfig();
+ log.info("Kafka ConsumerConfig: {}", consumerConfig);
consumer = new KafkaConsumer<>(consumerConfig);
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index a4f79107..449dacfc 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -36,6 +36,7 @@ import org.json.JSONObject;
import org.json.XML;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +77,7 @@ public class StoreService {
@Autowired
private ElasticsearchService elasticsearchService;
- private Map<String, Topic> topicMap = new HashMap<>();
+ private Map<String, TopicConfig> topicMap = new HashMap<>();
private ObjectMapper yamlReader;
@@ -94,7 +95,7 @@ public class StoreService {
return;
}
- Topic topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically
+ TopicConfig topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically
return topicService.getEffectiveTopic(topicStr);
});
@@ -111,7 +112,7 @@ public class StoreService {
saveJsons(topic, docs);
}
- private JSONObject messageToJson(Topic topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
+ private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
long timestamp = pair.getLeft();
String text = pair.getRight();
@@ -126,7 +127,7 @@ public class StoreService {
JSONObject json = null;
- DataFormat dataFormat = topic.getDataFormat();
+ DataFormat dataFormat = topic.getDataFormat2();
switch (dataFormat) {
case JSON:
@@ -160,7 +161,7 @@ public class StoreService {
return json;
}
- private void saveJsons(Topic topic, List<JSONObject> jsons) {
+ private void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
if (topic.supportMongoDB()) {
mongodbService.saveJsons(topic, jsons);
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index 7ae3ff71..f0b000bc 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -60,7 +60,7 @@ public class TopicService {
@Autowired
private DbRepository dbRepository;
- public Topic getEffectiveTopic(String topicStr) {
+ public TopicConfig getEffectiveTopic(String topicStr) {
try {
return getEffectiveTopic(topicStr, false);
} catch (IOException e) {
@@ -69,17 +69,18 @@ public class TopicService {
return null;
}
- public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
+ public TopicConfig getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
Topic topic = getTopic(topicStr);
if (topic == null) {
- topic = getDefaultTopic().clone();
- topic.setName(topicStr);
+ topic = getDefaultTopic();
}
+ TopicConfig topicConfig = topic.getTopicConfig();
+ topicConfig.setName(topicStr);//need to change name if it comes from DefaultTopic
- if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) {
+ if(ensureTableExist && topicConfig.isEnabled() && topicConfig.supportElasticsearch()) {
elasticsearchService.ensureTableExist(topicStr);
}
- return topic;
+ return topicConfig;
}
public Topic getTopic(String topicStr) {
@@ -116,10 +117,10 @@ public class TopicService {
topic.setName(tConfig.getName());
topic.setLogin(tConfig.getLogin());
topic.setPass(tConfig.getPassword());
- topic.setEnabled(tConfig.isEnable());
+ topic.setEnabled(tConfig.isEnabled());
topic.setSaveRaw(tConfig.isSaveRaw());
topic.setTtl(tConfig.getTtl());
- topic.setCorrelateClearedMessage(tConfig.isCorrelatedClearredMessage());
+ topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage());
topic.setDataFormat(tConfig.getDataFormat());
topic.setMessageIdPath(tConfig.getMessageIdPath());
diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties
index dfe48a29..e1b83999 100644
--- a/components/datalake-handler/feeder/src/main/resources/application.properties
+++ b/components/datalake-handler/feeder/src/main/resources/application.properties
@@ -27,6 +27,8 @@ dmaapZookeeperHostPort=message-router-zookeeper:2181
dmaapKafkaHostPort=message-router-kafka:9092
dmaapKafkaGroup=dlgroup10
dmaapKafkaTimeout=60
+dmaapKafkaExclude[0]=__consumer_offsets
+dmaapKafkaExclude[1]=msgrtr.apinode.metrics.dmaap
#check for new topics
dmaapCheckNewTopicIntervalInSec=3000
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java
index 7c2bf916..e96d940c 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java
@@ -136,12 +136,12 @@ public class TopicControllerTest {
when(topicRepository.findById("a")).thenReturn(Optional.of(a));
TopicConfig ac = new TopicConfig();
ac.setName("a");
- ac.setEnable(true);
+ ac.setEnabled(true);
PostReturnBody<TopicConfig> postConfig1 = topicController.updateTopic("a", ac, mockBindingResult, httpServletResponse);
assertEquals(200, postConfig1.getStatusCode());
TopicConfig ret = postConfig1.getReturnBody();
assertEquals("a", ret.getName());
- assertEquals(true, ret.isEnable());
+ assertEquals(true, ret.isEnabled());
when(mockBindingResult.hasErrors()).thenReturn(true);
PostReturnBody<TopicConfig> postConfig2 = topicController.updateTopic("a", ac, mockBindingResult, httpServletResponse);
assertEquals(null, postConfig2);
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
index b583473a..74f0884f 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
@@ -19,7 +19,6 @@
*/
package org.onap.datalake.feeder.domain;
-import org.json.JSONObject;
import org.junit.Test;
import org.onap.datalake.feeder.enumeration.DataFormat;
@@ -27,7 +26,6 @@ import java.util.HashSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -39,34 +37,8 @@ import static org.junit.Assert.assertTrue;
public class TopicTest {
@Test
- public void getMessageId() {
- String text = "{ data: { data2 : { value : 'hello'}}}";
-
- JSONObject json = new JSONObject(text);
-
- Topic topic = new Topic("test getMessageId");
- topic.setMessageIdPath("/data/data2/value");
-
- String value = topic.getMessageId(json);
-
- assertEquals(value, "hello");
- }
-
- @Test
public void getMessageIdFromMultipleAttributes() {
- String text = "{ data: { data2 : { value : 'hello'}, data3 : 'world'}}";
-
- JSONObject json = new JSONObject(text);
-
- Topic topic = new Topic("test getMessageId");
- topic.setMessageIdPath("/data/data2/value,/data/data3");
-
- String value = topic.getMessageId(json);
- assertEquals(value, "hello^world");
-
- topic.setMessageIdPath("");
- assertNull(topic.getMessageId(json));
-
+ Topic topic = new Topic("test getMessageId");
Topic defaultTopic = new Topic("_DL_DEFAULT_");
Topic testTopic = new Topic("test");
@@ -99,13 +71,6 @@ public class TopicTest {
defaultTopic.setDbs(new HashSet<>());
defaultTopic.getDbs().add(new Db("Elasticsearch"));
- assertTrue(defaultTopic.supportElasticsearch());
- assertFalse(testTopic.supportCouchbase());
- assertFalse(testTopic.supportDruid());
- assertFalse(testTopic.supportMongoDB());
-
- defaultTopic.getDbs().remove(new Db("Elasticsearch"));
- assertFalse(testTopic.supportElasticsearch());
assertEquals(defaultTopic.getDataFormat(), null);
defaultTopic.setCorrelateClearedMessage(true);
@@ -116,7 +81,7 @@ public class TopicTest {
assertTrue(defaultTopic.isEnabled());
assertTrue(defaultTopic.isSaveRaw());
- assertEquals(defaultTopic.getDataFormat(), DataFormat.XML);
+ assertEquals(defaultTopic.getTopicConfig().getDataFormat2(), DataFormat.XML);
defaultTopic.setDataFormat(null);
assertEquals(testTopic.getDataFormat(), null);
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
new file mode 100644
index 00000000..c65e920e
--- /dev/null
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.datalake.feeder.dto;
+
+import org.json.JSONObject;
+import org.junit.Test;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Topic;
+
+import java.util.HashSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test Topic
+ *
+ * @author Guobiao Mo
+ */
+
+public class TopicConfigTest {
+
+ @Test
+ public void getMessageId() {
+ String text = "{ data: { data2 : { value : 'hello'}}}";
+
+ JSONObject json = new JSONObject(text);
+
+ Topic topic = new Topic("test getMessageId");
+ topic.setMessageIdPath("/data/data2/value");
+
+ TopicConfig topicConfig = topic.getTopicConfig();
+
+ String value = topicConfig.getMessageId(json);
+
+ assertEquals(value, "hello");
+ }
+
+ @Test
+ public void getMessageIdFromMultipleAttributes() {
+ String text = "{ data: { data2 : { value : 'hello'}, data3 : 'world'}}";
+
+ JSONObject json = new JSONObject(text);
+
+ Topic topic = new Topic("test getMessageId");
+ topic.setMessageIdPath("/data/data2/value,/data/data3");
+
+ TopicConfig topicConfig = topic.getTopicConfig();
+
+ String value = topicConfig.getMessageId(json);
+ assertEquals(value, "hello^world");
+
+ topic.setMessageIdPath("");
+ topicConfig = topic.getTopicConfig();
+ assertNull(topicConfig.getMessageId(json));
+
+ }
+
+ @Test
+ public void testIs() {
+ Topic testTopic = new Topic("test");
+
+ assertTrue(testTopic.equals(new Topic("test")));
+ assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode());
+
+ testTopic.setDbs(new HashSet<>());
+ testTopic.getDbs().add(new Db("Elasticsearch"));
+
+ TopicConfig testTopicConfig = testTopic.getTopicConfig();
+
+ assertTrue(testTopicConfig.supportElasticsearch());
+ assertFalse(testTopicConfig.supportCouchbase());
+ assertFalse(testTopicConfig.supportDruid());
+ assertFalse(testTopicConfig.supportMongoDB());
+
+ testTopic.getDbs().remove(new Db("Elasticsearch"));
+ testTopicConfig = testTopic.getTopicConfig();
+ assertFalse(testTopicConfig.supportElasticsearch());
+
+ }
+}
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java
index 9e1b2d99..0efde44c 100755
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java
@@ -114,7 +114,7 @@ public class CouchbaseServiceTest {
CouchbaseService couchbaseService = new CouchbaseService();
couchbaseService.bucket = bucket;
couchbaseService.config = appConfig;
- couchbaseService.saveJsons(topic, jsons);
+ couchbaseService.saveJsons(topic.getTopicConfig(), jsons);
}
@@ -134,7 +134,7 @@ public class CouchbaseServiceTest {
CouchbaseService couchbaseService = new CouchbaseService();
couchbaseService.bucket = bucket;
couchbaseService.config = appConfig;
- couchbaseService.saveJsons(topic, jsons);
+ couchbaseService.saveJsons(topic.getTopicConfig(), jsons);
}
@Test
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/ElasticsearchServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/ElasticsearchServiceTest.java
index de2c1674..9590b0a4 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/ElasticsearchServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/ElasticsearchServiceTest.java
@@ -89,7 +89,7 @@ public class ElasticsearchServiceTest {
when(config.getElasticsearchType()).thenReturn("doc");
when(config.isAsync()).thenReturn(true);
- elasticsearchService.saveJsons(topic, jsons);
+ elasticsearchService.saveJsons(topic.getTopicConfig(), jsons);
}
} \ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/MongodbServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/MongodbServiceTest.java
index 41856760..016381be 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/MongodbServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/MongodbServiceTest.java
@@ -83,6 +83,6 @@ public class MongodbServiceTest {
jsons.add(jsonObject);
jsons.add(jsonObject2);
- mongodbService.saveJsons(topic, jsons);
+ mongodbService.saveJsons(topic.getTopicConfig(), jsons);
}
} \ No newline at end of file