summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--components/datalake-handler/feeder/src/assembly/scripts/init_db.sql4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java1
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java71
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java9
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java32
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java12
-rw-r--r--components/datalake-handler/feeder/src/main/resources/application.properties7
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java3
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java3
-rwxr-xr-xcomponents/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java11
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java14
-rw-r--r--components/datalake-handler/feeder/src/test/resources/application.properties10
15 files changed, 121 insertions, 66 deletions
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
index a349d146..d674ead1 100644
--- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
+++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
@@ -43,9 +43,9 @@ CREATE TABLE `map_db_topic` (
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-insert into db (`name`,`host`,`login`,`pass`,`database`) values ('Couchbase','dl_couchbase','dl','dl1234','datalake');
+insert into db (`name`,`host`,`login`,`pass`,`database_name`) values ('Couchbase','dl_couchbase','dl','dl1234','datalake');
insert into db (`name`,`host`) values ('Elasticsearch','dl_es');
-insert into db (`name`,`host`,`port`,`database`) values ('MongoDB','dl_mongodb',27017,'datalake');
+insert into db (`name`,`host`,`port`,`database_name`) values ('MongoDB','dl_mongodb',27017,'datalake');
insert into db (`name`,`host`) values ('Druid','dl_druid');
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 6bacf136..21ca1b70 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -53,4 +53,8 @@ public class ApplicationConfiguration {
private boolean async;
private boolean enableSSL;
+ private String timestampLabel;
+ private String rawDataLabel;
+
+ private String defaultTopicName;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
index 4bfe7aa3..2b724c72 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
@@ -192,6 +192,7 @@ public class DbController {
Db delDb = dbRepository.findByName(dbName);
if (delDb == null) {
sendError(response, 404, "Db not found: " + dbName);
+ return;
}
Set<Topic> topicRelation = delDb.getTopics();
topicRelation.clear();
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
index 747a72c8..0869fde7 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -49,18 +49,19 @@ import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.ApiOperation;
/**
- * This controller manages topic settings.
+ * This controller manages topic settings.
*
- * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is used for that topic.
- * All the settings are saved in database.
- * topic "_DL_DEFAULT_" is populated at setup by a DB script.
+ * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's
+ * enabled=null, _DL_DEFAULT_.enabled is used for that topic. All the settings
+ * are saved in database. topic "_DL_DEFAULT_" is populated at setup by a DB
+ * script.
*
* @author Guobiao Mo
*
*/
@RestController
-@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
+@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE }) //, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
public class TopicController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -70,23 +71,23 @@ public class TopicController {
@Autowired
private TopicRepository topicRepository;
-
+
@Autowired
private TopicService topicService;
@Autowired
private DbService dbService;
-
+
@GetMapping("/dmaap/")
@ResponseBody
- @ApiOperation(value="List all topic names in DMaaP.")
+ @ApiOperation(value = "List all topic names in DMaaP.")
public List<String> listDmaapTopics() throws IOException {
return dmaapService.getTopics();
}
@GetMapping("/")
@ResponseBody
- @ApiOperation(value="List all topics' settings.")
+ @ApiOperation(value = "List all topics' settings.")
public Iterable<Topic> list() throws IOException {
Iterable<Topic> ret = topicRepository.findAll();
return ret;
@@ -94,7 +95,7 @@ public class TopicController {
@GetMapping("/{topicName}")
@ResponseBody
- @ApiOperation(value="Get a topic's settings.")
+ @ApiOperation(value = "Get a topic's settings.")
public Topic getTopic(@PathVariable("topicName") String topicName) throws IOException {
Topic topic = topicService.getTopic(topicName);
return topic;
@@ -102,7 +103,7 @@ public class TopicController {
@GetMapping("/{topicName}/dbs")
@ResponseBody
- @ApiOperation(value="Get all DBs in a topic.")
+ @ApiOperation(value = "Get all DBs in a topic.")
public Set<Db> getTopicDbs(@PathVariable("topicName") String topicName) throws IOException {
Topic topic = topicService.getTopic(topicName);
Set<Db> dbs = topic.getDbs();
@@ -113,50 +114,50 @@ public class TopicController {
//One exception is that old DBs are kept
@PutMapping("/")
@ResponseBody
- @ApiOperation(value="Update a topic.")
+ @ApiOperation(value = "Update a topic.")
public Topic updateTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
if (result.hasErrors()) {
- sendError(response, 400, "Error parsing Topic: "+result.toString());
- return null;
+ sendError(response, 400, "Error parsing Topic: " + result.toString());
+ return null;
}
Topic oldTopic = getTopic(topic.getName());
if (oldTopic == null) {
- sendError(response, 404, "Topic not found "+topic.getName());
- return null;
+ sendError(response, 404, "Topic not found " + topic.getName());
+ return null;
} else {
- if(!topic.isDefault()) {
+ if (!topicService.istDefaultTopic(topic)) {
Topic defaultTopic = topicService.getDefaultTopic();
topic.setDefaultTopic(defaultTopic);
}
-
+
topic.setDbs(oldTopic.getDbs());
topicRepository.save(topic);
return topic;
}
}
-
+
@PostMapping("/")
@ResponseBody
- @ApiOperation(value="Create a new topic.")
+ @ApiOperation(value = "Create a new topic.")
public Topic createTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
-
+
if (result.hasErrors()) {
- sendError(response, 400, "Error parsing Topic: "+result.toString());
+ sendError(response, 400, "Error parsing Topic: " + result.toString());
return null;
}
Topic oldTopic = getTopic(topic.getName());
if (oldTopic != null) {
- sendError(response, 400, "Topic already exists "+topic.getName());
+ sendError(response, 400, "Topic already exists " + topic.getName());
return null;
} else {
- if(!topic.isDefault()) {
+ if (!topicService.istDefaultTopic(topic)) {
Topic defaultTopic = topicService.getDefaultTopic();
topic.setDefaultTopic(defaultTopic);
}
-
+
topicRepository.save(topic);
return topic;
}
@@ -164,32 +165,32 @@ public class TopicController {
@DeleteMapping("/{topicName}/db/{dbName}")
@ResponseBody
- @ApiOperation(value="Delete a DB from a topic.")
+ @ApiOperation(value = "Delete a DB from a topic.")
public Set<Db> deleteDb(@PathVariable("topicName") String topicName, @PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException {
Topic topic = topicService.getTopic(topicName);
Set<Db> dbs = topic.getDbs();
dbs.remove(new Db(dbName));
-
+
topicRepository.save(topic);
- return topic.getDbs();
+ return topic.getDbs();
}
@PutMapping("/{topicName}/db/{dbName}")
@ResponseBody
- @ApiOperation(value="Add a DB to a topic.")
+ @ApiOperation(value = "Add a DB to a topic.")
public Set<Db> addDb(@PathVariable("topicName") String topicName, @PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException {
Topic topic = topicService.getTopic(topicName);
- Set<Db> dbs = topic.getDbs();
+ Set<Db> dbs = topic.getDbs();
- Db db = dbService.getDb(dbName);
+ Db db = dbService.getDb(dbName);
dbs.add(db);
-
+
topicRepository.save(topic);
- return topic.getDbs();
+ return topic.getDbs();
}
-
+
private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
log.info(msg);
- response.sendError(sc, msg);
+ response.sendError(sc, msg);
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index b7f89fcd..2b92e869 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -111,10 +111,6 @@ public class Topic {
this.name = name;
}
- public boolean isDefault() {
- return "_DL_DEFAULT_".equals(name);
- }
-
public boolean isEnabled() {
return is(enabled, Topic::isEnabled);
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
index a717e10a..1e5fb78b 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
@@ -27,6 +27,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.json.JSONObject;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
@@ -57,6 +58,9 @@ public class CouchbaseService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
+ ApplicationConfiguration config;
+
+ @Autowired
private DbService dbService;
Bucket bucket;
@@ -73,12 +77,12 @@ public class CouchbaseService {
log.info("Connect to Couchbase {}", couchbase.getHost());
// Create a N1QL Primary Index (but ignore if it exists)
bucket.bucketManager().createN1qlPrimaryIndex(true, false);
+ isReady = true;
}
catch(Exception ex)
{
isReady = false;
}
- isReady = true;
}
@PreDestroy
@@ -92,7 +96,7 @@ public class CouchbaseService {
//convert to Couchbase JsonObject from org.json JSONObject
JsonObject jsonObject = JsonObject.fromJson(json.toString());
- long timestamp = jsonObject.getLong("_ts");//this is Kafka time stamp, which is added in StoreService.messageToJson()
+ long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson()
//setup TTL
int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second
@@ -102,6 +106,7 @@ public class CouchbaseService {
documents.add(doc);
}
saveDocuments(documents);
+ log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size());
}
public String getId(Topic topic, JSONObject json) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
index 3c75eb68..e9f36b2d 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
@@ -125,7 +125,7 @@ public class PullThread implements Runnable {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
messages.add(Pair.of(record.timestamp(), record.value()));
- log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+ //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
}
storeService.saveMessages(partition.topic(), messages);
log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index d9fe12a7..a4f79107 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -24,7 +24,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.Map;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -34,6 +34,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.XML;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.enumeration.DataFormat;
import org.slf4j.Logger;
@@ -41,16 +42,17 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-
+
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
/**
* Service to store messages to varieties of DBs
*
- * comment out YAML support, since AML is for config and don't see this data type in DMaaP. Do we need to support XML?
+ * comment out YAML support, since AML is for config and don't see this data
+ * type in DMaaP. Do we need to support XML?
*
* @author Guobiao Mo
*
@@ -60,6 +62,9 @@ public class StoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
+ private ApplicationConfiguration config;
+
+ @Autowired
private TopicService topicService;
@Autowired
@@ -71,9 +76,9 @@ public class StoreService {
@Autowired
private ElasticsearchService elasticsearchService;
- private Map<String, Topic> topicMap = new HashMap<>();
+ private Map<String, Topic> topicMap = new HashMap<>();
- private ObjectMapper yamlReader;
+ private ObjectMapper yamlReader;
@PostConstruct
private void init() {
@@ -112,14 +117,14 @@ public class StoreService {
String text = pair.getRight();
//for debug, to be remove
-// String topicStr = topic.getId();
-// if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
- // log.debug("{} ={}", topicStr, text);
+ // String topicStr = topic.getId();
+ // if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
+ // log.debug("{} ={}", topicStr, text);
//}
boolean storeRaw = topic.isSaveRaw();
- JSONObject json = null;
+ JSONObject json = null;
DataFormat dataFormat = topic.getDataFormat();
@@ -129,7 +134,7 @@ public class StoreService {
break;
case XML://XML and YAML can be directly inserted into ES, we may not need to convert it to JSON
json = XML.toJSONObject(text);
- break;
+ break;
case YAML:// Do we need to support YAML?
Object obj = yamlReader.readValue(text, Object.class);
ObjectMapper jsonWriter = new ObjectMapper();
@@ -145,10 +150,11 @@ public class StoreService {
//FIXME for debug, to be remove
json.remove("_id");
json.remove("_dl_text_");
+ json.remove("_dl_type_");
- json.put("_ts", timestamp);
+ json.put(config.getTimestampLabel(), timestamp);
if (storeRaw) {
- json.put("_text", text);
+ json.put(config.getRawDataLabel(), text);
}
return json;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index 7e0c7780..ea1eb4c4 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -23,6 +23,7 @@ package org.onap.datalake.feeder.service;
import java.io.IOException;
import java.util.Optional;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.TopicRepository;
import org.slf4j.Logger;
@@ -42,6 +43,9 @@ public class TopicService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
+ private ApplicationConfiguration config;
+
+ @Autowired
private TopicRepository topicRepository;
@Autowired
@@ -76,7 +80,13 @@ public class TopicService {
}
public Topic getDefaultTopic() {
- return getTopic("_DL_DEFAULT_");
+ return getTopic(config.getDefaultTopicName());
}
+ public boolean istDefaultTopic(Topic topic) {
+ if (topic == null) {
+ return false;
+ }
+ return topic.getName().equals(config.getDefaultTopicName());
+ }
}
diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties
index 842d01b3..1bd53392 100644
--- a/components/datalake-handler/feeder/src/main/resources/application.properties
+++ b/components/datalake-handler/feeder/src/main/resources/application.properties
@@ -38,6 +38,13 @@ async=true
#SSL global flag, if enabled, still need to check each individual DB SSL flag
enableSSL=false
+#names for extra fields that DL adds to each record
+timestampLabel=datalake_ts_
+rawDataLabel=datalake_text_
+
+defaultTopicName=_DL_DEFAULT_
+
+
#Logging
logging.level.org.springframework.web=ERROR
logging.level.com.att.nsa.apiClient.http=ERROR
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
index ee3ceb7b..3e7d9986 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
@@ -61,6 +61,9 @@ public class ApplicationConfigurationTest {
assertTrue(config.getKafkaConsumerCount() > 0);
assertNotNull(config.isAsync());
assertNotNull(config.isEnableSSL());
+ assertNotNull(config.getDefaultTopicName());
+ assertNotNull(config.getRawDataLabel());
+ assertNotNull(config.getTimestampLabel());
}
}
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
index afd4503d..8be45d60 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
@@ -98,9 +98,6 @@ public class TopicTest {
Topic testTopic = new Topic("test");
testTopic.setDefaultTopic(defaultTopic);
- assertTrue(defaultTopic.isDefault());
- assertFalse(testTopic.isDefault());
-
assertTrue(testTopic.equals(new Topic("test")));
assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode());
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java
index 9e40a2b1..9e1b2d99 100755
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java
@@ -34,6 +34,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Topic;
import java.util.ArrayList;
@@ -98,6 +99,8 @@ public class CouchbaseServiceTest {
@Test
public void testSaveJsonsWithTopicId() {
+ ApplicationConfiguration appConfig = new ApplicationConfiguration();
+ appConfig.setTimestampLabel("datalake_ts_");
String text = "{ data: { data2 : { value : 'hello'}}}";
@@ -106,16 +109,19 @@ public class CouchbaseServiceTest {
Topic topic = new Topic("test getMessageId");
topic.setMessageIdPath("/data/data2/value");
List<JSONObject> jsons = new ArrayList<>();
- json.put("_ts", 1234);
+ json.put(appConfig.getTimestampLabel(), 1234);
jsons.add(json);
CouchbaseService couchbaseService = new CouchbaseService();
couchbaseService.bucket = bucket;
+ couchbaseService.config = appConfig;
couchbaseService.saveJsons(topic, jsons);
}
@Test
public void testSaveJsonsWithOutTopicId() {
+ ApplicationConfiguration appConfig = new ApplicationConfiguration();
+ appConfig.setTimestampLabel("datalake_ts_");
String text = "{ data: { data2 : { value : 'hello'}}}";
@@ -123,10 +129,11 @@ public class CouchbaseServiceTest {
Topic topic = new Topic("test getMessageId");
List<JSONObject> jsons = new ArrayList<>();
- json.put("_ts", 1234);
+ json.put(appConfig.getTimestampLabel(), 1234);
jsons.add(json);
CouchbaseService couchbaseService = new CouchbaseService();
couchbaseService.bucket = bucket;
+ couchbaseService.config = appConfig;
couchbaseService.saveJsons(topic, jsons);
}
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
index 0e6db836..99f22398 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
@@ -23,6 +23,7 @@ package org.onap.datalake.feeder.service;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
@@ -36,6 +37,7 @@ import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.TopicRepository;
@@ -49,6 +51,11 @@ import org.onap.datalake.feeder.repository.TopicRepository;
@RunWith(MockitoJUnitRunner.class)
public class TopicServiceTest {
+ static String DEFAULT_TOPIC_NAME = "_DL_DEFAULT_";
+
+ @Mock
+ private ApplicationConfiguration config;
+
@Mock
private TopicRepository topicRepository;
@@ -74,9 +81,10 @@ public class TopicServiceTest {
@Test
public void testGetDefaultTopic() {
- String name = "_DL_DEFAULT_";
- when(topicRepository.findById(name)).thenReturn(Optional.of(new Topic(name)));
- assertEquals(topicService.getDefaultTopic(), new Topic(name));
+ when(topicRepository.findById(DEFAULT_TOPIC_NAME)).thenReturn(Optional.of(new Topic(DEFAULT_TOPIC_NAME)));
+ when(config.getDefaultTopicName()).thenReturn(DEFAULT_TOPIC_NAME);
+ assertEquals(topicService.getDefaultTopic(), new Topic(DEFAULT_TOPIC_NAME));
+ assertTrue(topicService.istDefaultTopic(new Topic(DEFAULT_TOPIC_NAME)));
}
@Test(expected = IOException.class)
diff --git a/components/datalake-handler/feeder/src/test/resources/application.properties b/components/datalake-handler/feeder/src/test/resources/application.properties
index d6d98e64..b9077056 100644
--- a/components/datalake-handler/feeder/src/test/resources/application.properties
+++ b/components/datalake-handler/feeder/src/test/resources/application.properties
@@ -17,6 +17,16 @@ kafkaConsumerCount=1
#tolerate inconsistency when system crash, see PullThread.run()
async=true
+#SSL global flag, if enabled, still need to check each individual DB SSL flag
+enableSSL=false
+
+#names for extra fields that DL adds to each record
+timestampLabel=datalake_ts_
+rawDataLabel=datalake_text_
+
+defaultTopicName=_DL_DEFAULT_
+
+
#Logging
logging.level.org.springframework.web=ERROR
logging.level.com.att.nsa.apiClient.http=ERROR