summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-04-29 17:05:39 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-04-29 20:59:56 -0700
commit2920426bd0f8369a178895138e97b0b19372c413 (patch)
tree6cc264e2db5608118b58e8d0cdca360ff6ae95f3 /components/datalake-handler/feeder/src/main/java/org
parentf411c14f0d8ee65008d83b4a8a7bd719310a587b (diff)
Use TopicConfig in backend4.0.0-ONAP1.0.0
Issue-ID: DCAEGEN2-1200 Change-Id: Ia18993524c272c42ba48485a636f04f94af0cd0a Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java1
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java25
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java115
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java96
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java5
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java34
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java10
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java7
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java1
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java11
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java17
11 files changed, 157 insertions, 165 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index a3add0ed..d59c0fc1 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -45,6 +45,7 @@ public class ApplicationConfiguration {
private String dmaapKafkaHostPort;
private String dmaapKafkaGroup;
private long dmaapKafkaTimeout;
+ private String[] dmaapKafkaExclude;
private int dmaapCheckNewTopicIntervalInSec;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
index f08a994d..d3a1fce3 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -55,10 +55,10 @@ import io.swagger.annotations.ApiOperation;
/**
* This controller manages topic settings.
*
- * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's
- * enabled=null, _DL_DEFAULT_.enabled is used for that topic. All the settings
- * are saved in database. topic "_DL_DEFAULT_" is populated at setup by a DB
- * script.
+ * Topic "_DL_DEFAULT_" acts as the default.
+ * If a topic is not present in database, "_DL_DEFAULT_" is used for it.
+ * If a topic is present in database, itself should be complete, and no setting from "_DL_DEFAULT_" is used.
+ * Topic "_DL_DEFAULT_" is populated at setup by a DB script.
*
* @author Guobiao Mo
* @contributor Kate Hsuan @ QCT
@@ -88,7 +88,7 @@ public class TopicController {
@GetMapping("")
@ResponseBody
- @ApiOperation(value="List all topics in database")
+ @ApiOperation(value="List all topic names in database")
public List<String> list() {
Iterable<Topic> ret = topicRepository.findAll();
List<String> retString = new ArrayList<>();
@@ -114,13 +114,11 @@ public class TopicController {
sendError(response, 400, "Topic already exists "+topicConfig.getName());
return null;
} else {
- PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
Topic wTopic = topicService.fillTopicConfiguration(topicConfig);
if(wTopic.getTtl() == 0)
wTopic.setTtl(3650);
- topicRepository.save(wTopic);
- mkPostReturnBody(retBody, 200, wTopic);
- return retBody;
+ topicRepository.save(wTopic);
+ return mkPostReturnBody(200, wTopic);
}
}
@@ -159,18 +157,19 @@ public class TopicController {
sendError(response, 404, "Topic not found "+topicConfig.getName());
return null;
} else {
- PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
topicService.fillTopicConfiguration(topicConfig, oldTopic);
topicRepository.save(oldTopic);
- mkPostReturnBody(retBody, 200, oldTopic);
- return retBody;
+ return mkPostReturnBody(200, oldTopic);
}
}
- private void mkPostReturnBody(PostReturnBody<TopicConfig> retBody, int statusCode, Topic topic)
+ private PostReturnBody<TopicConfig> mkPostReturnBody(int statusCode, Topic topic)
{
+ PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
retBody.setStatusCode(statusCode);
retBody.setReturnBody(topic.getTopicConfig());
+
+ return retBody;
}
private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index 06c6b8cc..c618f57f 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -32,10 +32,7 @@ import javax.persistence.JoinTable;
import javax.persistence.ManyToMany;
import javax.persistence.Table;
-import org.apache.commons.lang3.StringUtils;
-import org.json.JSONObject;
import org.onap.datalake.feeder.dto.TopicConfig;
-import org.onap.datalake.feeder.enumeration.DataFormat;
import com.fasterxml.jackson.annotation.JsonBackReference;
@@ -54,11 +51,10 @@ import lombok.Setter;
@Table(name = "topic")
public class Topic {
@Id
- @Column(name="`name`")
+ @Column(name = "`name`")
private String name;//topic name
-
- //for protected Kafka topics
+ //for protected Kafka topics
@Column(name = "`login`")
private String login;
@@ -69,16 +65,13 @@ public class Topic {
@JsonBackReference
//@JsonManagedReference
@ManyToMany(fetch = FetchType.EAGER)
- @JoinTable( name = "map_db_topic",
- joinColumns = { @JoinColumn(name="topic_name") },
- inverseJoinColumns = { @JoinColumn(name="db_name") }
- )
+ @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_name") }, inverseJoinColumns = { @JoinColumn(name = "db_name") })
protected Set<Db> dbs;
/**
* indicate if we should monitor this topic
*/
- @Column(name="`enabled`")
+ @Column(name = "`enabled`")
private Boolean enabled;
/**
@@ -88,10 +81,10 @@ public class Topic {
private Boolean saveRaw;
/**
- * need to explicitly tell feeder the data format of the message.
- * support JSON, XML, YAML, TEXT
+ * need to explicitly tell feeder the data format of the message. support JSON,
+ * XML, YAML, TEXT
*/
- @Column(name="`data_format`")
+ @Column(name = "`data_format`")
private String dataFormat;
/**
@@ -114,22 +107,6 @@ public class Topic {
this.name = name;
}
- public Topic clone() { //TODO will use TopicConfig
- Topic ret = new Topic();
- ret.setCorrelateClearedMessage(correlateClearedMessage);
- ret.setDataFormat(dataFormat);
- ret.setDbs(dbs);
- ret.setEnabled(enabled);
- ret.setLogin(login);
- ret.setMessageIdPath(messageIdPath);
- ret.setName(name);
- ret.setPass(pass);
- ret.setSaveRaw(saveRaw);
- ret.setTtl(ttl);
-
- return ret;
- }
-
public boolean isDefault() {
return "_DL_DEFAULT_".equals(name);
}
@@ -145,19 +122,11 @@ public class Topic {
public int getTtl() {
if (ttl != null) {
return ttl;
- } else {
+ } else {
return 3650;//default to 10 years for safe
}
}
- public DataFormat getDataFormat() {
- if (dataFormat != null) {
- return DataFormat.fromString(dataFormat);
- } else {
- return null;
- }
- }
-
private boolean is(Boolean b) {
return is(b, false);
}
@@ -165,7 +134,7 @@ public class Topic {
private boolean is(Boolean b, boolean defaultValue) {
if (b != null) {
return b;
- } else {
+ } else {
return defaultValue;
}
}
@@ -174,71 +143,25 @@ public class Topic {
return is(saveRaw);
}
- public boolean supportElasticsearch() {
- return containDb("Elasticsearch");//TODO string hard codes
- }
-
- public boolean supportCouchbase() {
- return containDb("Couchbase");
- }
-
- public boolean supportDruid() {
- return containDb("Druid");
- }
-
- public boolean supportMongoDB() {
- return containDb("MongoDB");
- }
-
- private boolean containDb(String dbName) {
- Db db = new Db(dbName);
-
- if (dbs != null && dbs.contains(db)) {
- return true;
- } else {
- return false;
- }
- }
-
- //extract DB id from JSON attributes, support multiple attributes
- public String getMessageId(JSONObject json) {
- String id = null;
-
- if (StringUtils.isNotBlank(messageIdPath)) {
- String[] paths = messageIdPath.split(",");
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < paths.length; i++) {
- if (i > 0) {
- sb.append('^');
- }
- sb.append(json.query(paths[i]).toString());
- }
- id = sb.toString();
- }
-
- return id;
- }
-
public TopicConfig getTopicConfig() {
TopicConfig tConfig = new TopicConfig();
-
+
tConfig.setName(getName());
- tConfig.setEnable(getEnabled());
- if(getDataFormat() != null)
- tConfig.setDataFormat(getDataFormat().toString());
- tConfig.setSaveRaw(getSaveRaw());
- tConfig.setCorrelatedClearredMessage((getCorrelateClearedMessage() == null) ? getCorrelateClearedMessage() : false);
+ tConfig.setEnabled(isEnabled());
+ tConfig.setDataFormat(dataFormat);
+ tConfig.setSaveRaw(isSaveRaw());
+ tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage());
tConfig.setMessageIdPath(getMessageIdPath());
tConfig.setTtl(getTtl());
Set<Db> topicDb = getDbs();
List<String> dbList = new ArrayList<>();
- for(Db item: topicDb)
- {
- dbList.add(item.getName());
+ if (topicDb != null) {
+ for (Db item : topicDb) {
+ dbList.add(item.getName());
+ }
}
tConfig.setSinkdbs(dbList);
-
+
return tConfig;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index 76b41cb5..15ffc8a3 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -22,14 +22,12 @@ package org.onap.datalake.feeder.dto;
import lombok.Getter;
import lombok.Setter;
-import org.onap.datalake.feeder.domain.Topic;
-import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.repository.DbRepository;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONObject;
+import org.onap.datalake.feeder.enumeration.DataFormat;
/**
* JSON request body for Topic manipulation.
@@ -43,17 +41,85 @@ import java.util.Set;
public class TopicConfig {
- private String name;
- private String login;
- private String password;
- private List<String> sinkdbs;
- private boolean enable;
- private boolean saveRaw;
- private String dataFormat;
- private int ttl;
- private boolean correlatedClearredMessage;
- private String messageIdPath;
+ private String name;
+ private String login;
+ private String password;
+ private List<String> sinkdbs;
+ private boolean enabled;
+ private boolean saveRaw;
+ private String dataFormat;
+ private int ttl;
+ private boolean correlateClearedMessage;
+ private String messageIdPath;
+ public DataFormat getDataFormat2() {
+ if (dataFormat != null) {
+ return DataFormat.fromString(dataFormat);
+ } else {
+ return null;
+ }
+ }
+
+ public boolean supportElasticsearch() {
+ return containDb("Elasticsearch");//TODO string hard codes
+ }
+
+ public boolean supportCouchbase() {
+ return containDb("Couchbase");
+ }
+
+ public boolean supportDruid() {
+ return containDb("Druid");
+ }
+
+ public boolean supportMongoDB() {
+ return containDb("MongoDB");
+ }
+
+ private boolean containDb(String dbName) {
+ return (sinkdbs != null && sinkdbs.contains(dbName));
+ }
+
+ //extract DB id from JSON attributes, support multiple attributes
+ public String getMessageId(JSONObject json) {
+ String id = null;
+
+ if (StringUtils.isNotBlank(messageIdPath)) {
+ String[] paths = messageIdPath.split(",");
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < paths.length; i++) {
+ if (i > 0) {
+ sb.append('^');
+ }
+ sb.append(json.query(paths[i]).toString());
+ }
+ id = sb.toString();
+ }
+
+ return id;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (this.getClass() != obj.getClass())
+ return false;
+
+ return name.equals(((TopicConfig) obj).getName());
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
index 1e5fb78b..12d03ee6 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
@@ -30,6 +30,7 @@ import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +91,7 @@ public class CouchbaseService {
bucket.close();
}
- public void saveJsons(Topic topic, List<JSONObject> jsons) {
+ public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
List<JsonDocument> documents= new ArrayList<>(jsons.size());
for(JSONObject json : jsons) {
//convert to Couchbase JsonObject from org.json JSONObject
@@ -109,7 +110,7 @@ public class CouchbaseService {
log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size());
}
- public String getId(Topic topic, JSONObject json) {
+ public String getId(TopicConfig topic, JSONObject json) {
//if this topic requires extract id from JSON
String id = topic.getMessageId(json);
if(id != null) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
index 270db932..de8c9e89 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
@@ -25,13 +25,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import javax.annotation.PostConstruct;
-
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
-import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -54,12 +52,6 @@ public class DmaapService {
@Autowired
private TopicService topicService;
-
- @PostConstruct
- private void init() {
-
- }
-
//get all topic names from Zookeeper
public List<String> getTopics() {
try {
@@ -67,28 +59,32 @@ public class DmaapService {
@Override
public void process(WatchedEvent event) {
// TODO monitor new topics
-
+
}
- };
+ };
ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher);
- List<String> topics = zk.getChildren("/brokers/topics", false);
- return topics;
+ List<String> topics = zk.getChildren("/brokers/topics", false);
+ String[] excludes = config.getDmaapKafkaExclude();
+ for (String exclude : excludes) {
+ topics.remove(exclude);
+ }
+ return topics;
} catch (Exception e) {
log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e);
- return null;
+ return Collections.emptyList();
}
- }
+ }
- public List<String> getActiveTopics() throws IOException {
+ public List<String> getActiveTopics() throws IOException {
List<String> allTopics = getTopics();
- if(allTopics == null) {
+ if (allTopics == null) {
return Collections.emptyList();
}
List<String> ret = new ArrayList<>();
for (String topicStr : allTopics) {
- Topic topic = topicService.getEffectiveTopic(topicStr, true);
- if (topic.isEnabled()) {
+ TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true);
+ if (topicConfig.isEnabled()) {
ret.add(topicStr);
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index 30aa7332..4090e7eb 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -47,7 +47,7 @@ import org.elasticsearch.rest.RestStatus;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,6 +112,7 @@ public class ElasticsearchService {
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
if(!exists){
+ //TODO submit mapping template
CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged());
@@ -119,7 +120,7 @@ public class ElasticsearchService {
}
//TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
- public void saveJsons(Topic topic, List<JSONObject> jsons) {
+ public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
BulkRequest request = new BulkRequest();
for (JSONObject json : jsons) {
@@ -134,6 +135,9 @@ public class ElasticsearchService {
request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
}
+
+ log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size());
+
if(config.isAsync()) {
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
}else {
@@ -155,7 +159,7 @@ public class ElasticsearchService {
* The search API can only query all data or based on the fields in the source.
* So use the get API, three parameters: index, type, document id
*/
- private boolean correlateClearedMessage(Topic topic, JSONObject json) {
+ private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) {
boolean found = false;
String eName = null;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
index fb3f806c..02c80a45 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
@@ -34,7 +34,7 @@ import org.bson.Document;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +48,6 @@ import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
-import com.mongodb.DB;
/**
* Service for using MongoDB
@@ -131,7 +130,7 @@ public class MongodbService {
mongoClient.close();
}
- public void saveJsons(Topic topic, List<JSONObject> jsons) {
+ public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
if(dbReady == false)
return;
List<Document> documents = new ArrayList<>(jsons.size());
@@ -150,7 +149,7 @@ public class MongodbService {
MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k));
collection.insertMany(documents);
- log.debug("saved text to topic = {}, topic total count = {} ", topic, collection.countDocuments());
+ log.debug("saved text to topic = {}, batch count = {} ", topic, jsons.size());
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
index ce671a90..b3a6d29a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
@@ -82,6 +82,7 @@ public class PullThread implements Runnable {
private void init() {
async = config.isAsync();
Properties consumerConfig = getConsumerConfig();
+ log.info("Kafka ConsumerConfig: {}", consumerConfig);
consumer = new KafkaConsumer<>(consumerConfig);
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index a4f79107..449dacfc 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -36,6 +36,7 @@ import org.json.JSONObject;
import org.json.XML;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +77,7 @@ public class StoreService {
@Autowired
private ElasticsearchService elasticsearchService;
- private Map<String, Topic> topicMap = new HashMap<>();
+ private Map<String, TopicConfig> topicMap = new HashMap<>();
private ObjectMapper yamlReader;
@@ -94,7 +95,7 @@ public class StoreService {
return;
}
- Topic topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically
+ TopicConfig topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically
return topicService.getEffectiveTopic(topicStr);
});
@@ -111,7 +112,7 @@ public class StoreService {
saveJsons(topic, docs);
}
- private JSONObject messageToJson(Topic topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
+ private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
long timestamp = pair.getLeft();
String text = pair.getRight();
@@ -126,7 +127,7 @@ public class StoreService {
JSONObject json = null;
- DataFormat dataFormat = topic.getDataFormat();
+ DataFormat dataFormat = topic.getDataFormat2();
switch (dataFormat) {
case JSON:
@@ -160,7 +161,7 @@ public class StoreService {
return json;
}
- private void saveJsons(Topic topic, List<JSONObject> jsons) {
+ private void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
if (topic.supportMongoDB()) {
mongodbService.saveJsons(topic, jsons);
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index 7ae3ff71..f0b000bc 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -60,7 +60,7 @@ public class TopicService {
@Autowired
private DbRepository dbRepository;
- public Topic getEffectiveTopic(String topicStr) {
+ public TopicConfig getEffectiveTopic(String topicStr) {
try {
return getEffectiveTopic(topicStr, false);
} catch (IOException e) {
@@ -69,17 +69,18 @@ public class TopicService {
return null;
}
- public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
+ public TopicConfig getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
Topic topic = getTopic(topicStr);
if (topic == null) {
- topic = getDefaultTopic().clone();
- topic.setName(topicStr);
+ topic = getDefaultTopic();
}
+ TopicConfig topicConfig = topic.getTopicConfig();
+ topicConfig.setName(topicStr);//need to change name if it comes from DefaultTopic
- if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) {
+ if(ensureTableExist && topicConfig.isEnabled() && topicConfig.supportElasticsearch()) {
elasticsearchService.ensureTableExist(topicStr);
}
- return topic;
+ return topicConfig;
}
public Topic getTopic(String topicStr) {
@@ -116,10 +117,10 @@ public class TopicService {
topic.setName(tConfig.getName());
topic.setLogin(tConfig.getLogin());
topic.setPass(tConfig.getPassword());
- topic.setEnabled(tConfig.isEnable());
+ topic.setEnabled(tConfig.isEnabled());
topic.setSaveRaw(tConfig.isSaveRaw());
topic.setTtl(tConfig.getTtl());
- topic.setCorrelateClearedMessage(tConfig.isCorrelatedClearredMessage());
+ topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage());
topic.setDataFormat(tConfig.getDataFormat());
topic.setMessageIdPath(tConfig.getMessageIdPath());