summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-04-23 15:46:15 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-04-23 15:46:15 -0700
commitc82dafde6b3f02cccba0822a340cb9d7bed35a1c (patch)
treeaa1593e841f213633873c9aff1bd2341d2f979ba /components/datalake-handler/feeder/src/main/java/org
parent3ab4672bacf1870722e9129f9cd7f7fa18f602f2 (diff)
Move hard-coded strings to app configuration
Issue-ID: DCAEGEN2-1452 Change-Id: I6577ba3b5a8f877bbaa13d0c590d6a2072331136 Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java1
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java71
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java9
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java32
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java12
8 files changed, 79 insertions, 56 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 6bacf136..21ca1b70 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -53,4 +53,8 @@ public class ApplicationConfiguration {
private boolean async;
private boolean enableSSL;
+ private String timestampLabel;
+ private String rawDataLabel;
+
+ private String defaultTopicName;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
index 4bfe7aa3..2b724c72 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
@@ -192,6 +192,7 @@ public class DbController {
Db delDb = dbRepository.findByName(dbName);
if (delDb == null) {
sendError(response, 404, "Db not found: " + dbName);
+ return;
}
Set<Topic> topicRelation = delDb.getTopics();
topicRelation.clear();
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
index 747a72c8..0869fde7 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -49,18 +49,19 @@ import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.ApiOperation;
/**
- * This controller manages topic settings.
+ * This controller manages topic settings.
*
- * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is used for that topic.
- * All the settings are saved in database.
- * topic "_DL_DEFAULT_" is populated at setup by a DB script.
+ * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's
+ * enabled=null, _DL_DEFAULT_.enabled is used for that topic. All the settings
+ * are saved in database. topic "_DL_DEFAULT_" is populated at setup by a DB
+ * script.
*
* @author Guobiao Mo
*
*/
@RestController
-@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
+@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE }) //, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
public class TopicController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -70,23 +71,23 @@ public class TopicController {
@Autowired
private TopicRepository topicRepository;
-
+
@Autowired
private TopicService topicService;
@Autowired
private DbService dbService;
-
+
@GetMapping("/dmaap/")
@ResponseBody
- @ApiOperation(value="List all topic names in DMaaP.")
+ @ApiOperation(value = "List all topic names in DMaaP.")
public List<String> listDmaapTopics() throws IOException {
return dmaapService.getTopics();
}
@GetMapping("/")
@ResponseBody
- @ApiOperation(value="List all topics' settings.")
+ @ApiOperation(value = "List all topics' settings.")
public Iterable<Topic> list() throws IOException {
Iterable<Topic> ret = topicRepository.findAll();
return ret;
@@ -94,7 +95,7 @@ public class TopicController {
@GetMapping("/{topicName}")
@ResponseBody
- @ApiOperation(value="Get a topic's settings.")
+ @ApiOperation(value = "Get a topic's settings.")
public Topic getTopic(@PathVariable("topicName") String topicName) throws IOException {
Topic topic = topicService.getTopic(topicName);
return topic;
@@ -102,7 +103,7 @@ public class TopicController {
@GetMapping("/{topicName}/dbs")
@ResponseBody
- @ApiOperation(value="Get all DBs in a topic.")
+ @ApiOperation(value = "Get all DBs in a topic.")
public Set<Db> getTopicDbs(@PathVariable("topicName") String topicName) throws IOException {
Topic topic = topicService.getTopic(topicName);
Set<Db> dbs = topic.getDbs();
@@ -113,50 +114,50 @@ public class TopicController {
//One exception is that old DBs are kept
@PutMapping("/")
@ResponseBody
- @ApiOperation(value="Update a topic.")
+ @ApiOperation(value = "Update a topic.")
public Topic updateTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
if (result.hasErrors()) {
- sendError(response, 400, "Error parsing Topic: "+result.toString());
- return null;
+ sendError(response, 400, "Error parsing Topic: " + result.toString());
+ return null;
}
Topic oldTopic = getTopic(topic.getName());
if (oldTopic == null) {
- sendError(response, 404, "Topic not found "+topic.getName());
- return null;
+ sendError(response, 404, "Topic not found " + topic.getName());
+ return null;
} else {
- if(!topic.isDefault()) {
+ if (!topicService.istDefaultTopic(topic)) {
Topic defaultTopic = topicService.getDefaultTopic();
topic.setDefaultTopic(defaultTopic);
}
-
+
topic.setDbs(oldTopic.getDbs());
topicRepository.save(topic);
return topic;
}
}
-
+
@PostMapping("/")
@ResponseBody
- @ApiOperation(value="Create a new topic.")
+ @ApiOperation(value = "Create a new topic.")
public Topic createTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
-
+
if (result.hasErrors()) {
- sendError(response, 400, "Error parsing Topic: "+result.toString());
+ sendError(response, 400, "Error parsing Topic: " + result.toString());
return null;
}
Topic oldTopic = getTopic(topic.getName());
if (oldTopic != null) {
- sendError(response, 400, "Topic already exists "+topic.getName());
+ sendError(response, 400, "Topic already exists " + topic.getName());
return null;
} else {
- if(!topic.isDefault()) {
+ if (!topicService.istDefaultTopic(topic)) {
Topic defaultTopic = topicService.getDefaultTopic();
topic.setDefaultTopic(defaultTopic);
}
-
+
topicRepository.save(topic);
return topic;
}
@@ -164,32 +165,32 @@ public class TopicController {
@DeleteMapping("/{topicName}/db/{dbName}")
@ResponseBody
- @ApiOperation(value="Delete a DB from a topic.")
+ @ApiOperation(value = "Delete a DB from a topic.")
public Set<Db> deleteDb(@PathVariable("topicName") String topicName, @PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException {
Topic topic = topicService.getTopic(topicName);
Set<Db> dbs = topic.getDbs();
dbs.remove(new Db(dbName));
-
+
topicRepository.save(topic);
- return topic.getDbs();
+ return topic.getDbs();
}
@PutMapping("/{topicName}/db/{dbName}")
@ResponseBody
- @ApiOperation(value="Add a DB to a topic.")
+ @ApiOperation(value = "Add a DB to a topic.")
public Set<Db> addDb(@PathVariable("topicName") String topicName, @PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException {
Topic topic = topicService.getTopic(topicName);
- Set<Db> dbs = topic.getDbs();
+ Set<Db> dbs = topic.getDbs();
- Db db = dbService.getDb(dbName);
+ Db db = dbService.getDb(dbName);
dbs.add(db);
-
+
topicRepository.save(topic);
- return topic.getDbs();
+ return topic.getDbs();
}
-
+
private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
log.info(msg);
- response.sendError(sc, msg);
+ response.sendError(sc, msg);
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index b7f89fcd..2b92e869 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -111,10 +111,6 @@ public class Topic {
this.name = name;
}
- public boolean isDefault() {
- return "_DL_DEFAULT_".equals(name);
- }
-
public boolean isEnabled() {
return is(enabled, Topic::isEnabled);
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
index a717e10a..1e5fb78b 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
@@ -27,6 +27,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.json.JSONObject;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
@@ -57,6 +58,9 @@ public class CouchbaseService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
+ ApplicationConfiguration config;
+
+ @Autowired
private DbService dbService;
Bucket bucket;
@@ -73,12 +77,12 @@ public class CouchbaseService {
log.info("Connect to Couchbase {}", couchbase.getHost());
// Create a N1QL Primary Index (but ignore if it exists)
bucket.bucketManager().createN1qlPrimaryIndex(true, false);
+ isReady = true;
}
catch(Exception ex)
{
isReady = false;
}
- isReady = true;
}
@PreDestroy
@@ -92,7 +96,7 @@ public class CouchbaseService {
//convert to Couchbase JsonObject from org.json JSONObject
JsonObject jsonObject = JsonObject.fromJson(json.toString());
- long timestamp = jsonObject.getLong("_ts");//this is Kafka time stamp, which is added in StoreService.messageToJson()
+ long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson()
//setup TTL
int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second
@@ -102,6 +106,7 @@ public class CouchbaseService {
documents.add(doc);
}
saveDocuments(documents);
+ log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size());
}
public String getId(Topic topic, JSONObject json) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
index 3c75eb68..e9f36b2d 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
@@ -125,7 +125,7 @@ public class PullThread implements Runnable {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
messages.add(Pair.of(record.timestamp(), record.value()));
- log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+ //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
}
storeService.saveMessages(partition.topic(), messages);
log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index d9fe12a7..a4f79107 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -24,7 +24,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.Map;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -34,6 +34,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.XML;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.enumeration.DataFormat;
import org.slf4j.Logger;
@@ -41,16 +42,17 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-
+
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
/**
* Service to store messages to varieties of DBs
*
- * comment out YAML support, since AML is for config and don't see this data type in DMaaP. Do we need to support XML?
+ * comment out YAML support, since AML is for config and don't see this data
+ * type in DMaaP. Do we need to support XML?
*
* @author Guobiao Mo
*
@@ -60,6 +62,9 @@ public class StoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
+ private ApplicationConfiguration config;
+
+ @Autowired
private TopicService topicService;
@Autowired
@@ -71,9 +76,9 @@ public class StoreService {
@Autowired
private ElasticsearchService elasticsearchService;
- private Map<String, Topic> topicMap = new HashMap<>();
+ private Map<String, Topic> topicMap = new HashMap<>();
- private ObjectMapper yamlReader;
+ private ObjectMapper yamlReader;
@PostConstruct
private void init() {
@@ -112,14 +117,14 @@ public class StoreService {
String text = pair.getRight();
//for debug, to be remove
-// String topicStr = topic.getId();
-// if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
- // log.debug("{} ={}", topicStr, text);
+ // String topicStr = topic.getId();
+ // if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
+ // log.debug("{} ={}", topicStr, text);
//}
boolean storeRaw = topic.isSaveRaw();
- JSONObject json = null;
+ JSONObject json = null;
DataFormat dataFormat = topic.getDataFormat();
@@ -129,7 +134,7 @@ public class StoreService {
break;
case XML://XML and YAML can be directly inserted into ES, we may not need to convert it to JSON
json = XML.toJSONObject(text);
- break;
+ break;
case YAML:// Do we need to support YAML?
Object obj = yamlReader.readValue(text, Object.class);
ObjectMapper jsonWriter = new ObjectMapper();
@@ -145,10 +150,11 @@ public class StoreService {
//FIXME for debug, to be remove
json.remove("_id");
json.remove("_dl_text_");
+ json.remove("_dl_type_");
- json.put("_ts", timestamp);
+ json.put(config.getTimestampLabel(), timestamp);
if (storeRaw) {
- json.put("_text", text);
+ json.put(config.getRawDataLabel(), text);
}
return json;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index 7e0c7780..ea1eb4c4 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -23,6 +23,7 @@ package org.onap.datalake.feeder.service;
import java.io.IOException;
import java.util.Optional;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.TopicRepository;
import org.slf4j.Logger;
@@ -42,6 +43,9 @@ public class TopicService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
+ private ApplicationConfiguration config;
+
+ @Autowired
private TopicRepository topicRepository;
@Autowired
@@ -76,7 +80,13 @@ public class TopicService {
}
public Topic getDefaultTopic() {
- return getTopic("_DL_DEFAULT_");
+ return getTopic(config.getDefaultTopicName());
}
+ public boolean istDefaultTopic(Topic topic) {
+ if (topic == null) {
+ return false;
+ }
+ return topic.getName().equals(config.getDefaultTopicName());
+ }
}