From c82dafde6b3f02cccba0822a340cb9d7bed35a1c Mon Sep 17 00:00:00 2001 From: Guobiao Mo Date: Tue, 23 Apr 2019 15:46:15 -0700 Subject: Move hard-coded strings to app configuration Issue-ID: DCAEGEN2-1452 Change-Id: I6577ba3b5a8f877bbaa13d0c590d6a2072331136 Signed-off-by: Guobiao Mo --- .../feeder/src/assembly/scripts/init_db.sql | 4 +- .../feeder/config/ApplicationConfiguration.java | 4 ++ .../datalake/feeder/controller/DbController.java | 1 + .../feeder/controller/TopicController.java | 71 +++++++++++----------- .../org/onap/datalake/feeder/domain/Topic.java | 4 -- .../datalake/feeder/service/CouchbaseService.java | 9 ++- .../onap/datalake/feeder/service/PullThread.java | 2 +- .../onap/datalake/feeder/service/StoreService.java | 32 ++++++---- .../onap/datalake/feeder/service/TopicService.java | 12 +++- .../src/main/resources/application.properties | 7 +++ .../config/ApplicationConfigurationTest.java | 3 + .../org/onap/datalake/feeder/domain/TopicTest.java | 3 - .../feeder/service/CouchbaseServiceTest.java | 11 +++- .../datalake/feeder/service/TopicServiceTest.java | 14 ++++- .../src/test/resources/application.properties | 10 +++ 15 files changed, 121 insertions(+), 66 deletions(-) (limited to 'components/datalake-handler/feeder') diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql index a349d146..d674ead1 100644 --- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql +++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql @@ -43,9 +43,9 @@ CREATE TABLE `map_db_topic` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -insert into db (`name`,`host`,`login`,`pass`,`database`) values ('Couchbase','dl_couchbase','dl','dl1234','datalake'); +insert into db (`name`,`host`,`login`,`pass`,`database_name`) values ('Couchbase','dl_couchbase','dl','dl1234','datalake'); insert into db (`name`,`host`) values ('Elasticsearch','dl_es'); -insert into db (`name`,`host`,`port`,`database`) values ('MongoDB','dl_mongodb',27017,'datalake'); +insert into db (`name`,`host`,`port`,`database_name`) values ('MongoDB','dl_mongodb',27017,'datalake'); insert into db (`name`,`host`) values ('Druid','dl_druid'); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index 6bacf136..21ca1b70 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -53,4 +53,8 @@ public class ApplicationConfiguration { private boolean async; private boolean enableSSL; + private String timestampLabel; + private String rawDataLabel; + + private String defaultTopicName; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java index 4bfe7aa3..2b724c72 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java @@ -192,6 +192,7 @@ public class DbController { Db delDb = dbRepository.findByName(dbName); if (delDb == null) { sendError(response, 404, "Db not found: " + dbName); + return; } Set topicRelation = delDb.getTopics(); topicRelation.clear(); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java index 747a72c8..0869fde7 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java @@ -49,18 +49,19 @@ import org.springframework.web.bind.annotation.RestController; import io.swagger.annotations.ApiOperation; /** - * This controller manages topic settings. + * This controller manages topic settings. * - * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is used for that topic. - * All the settings are saved in database. - * topic "_DL_DEFAULT_" is populated at setup by a DB script. + * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's + * enabled=null, _DL_DEFAULT_.enabled is used for that topic. All the settings + * are saved in database. topic "_DL_DEFAULT_" is populated at setup by a DB + * script. * * @author Guobiao Mo * */ @RestController -@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE}) +@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE }) //, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE}) public class TopicController { private final Logger log = LoggerFactory.getLogger(this.getClass()); @@ -70,23 +71,23 @@ public class TopicController { @Autowired private TopicRepository topicRepository; - + @Autowired private TopicService topicService; @Autowired private DbService dbService; - + @GetMapping("/dmaap/") @ResponseBody - @ApiOperation(value="List all topic names in DMaaP.") + @ApiOperation(value = "List all topic names in DMaaP.") public List listDmaapTopics() throws IOException { return dmaapService.getTopics(); } @GetMapping("/") @ResponseBody - @ApiOperation(value="List all topics' settings.") + @ApiOperation(value = "List all topics' settings.") public Iterable list() throws IOException { Iterable ret = topicRepository.findAll(); return ret; @@ -94,7 +95,7 @@ public class TopicController { @GetMapping("/{topicName}") @ResponseBody - @ApiOperation(value="Get a topic's settings.") + @ApiOperation(value = "Get a topic's settings.") public Topic getTopic(@PathVariable("topicName") String topicName) throws IOException { Topic topic = topicService.getTopic(topicName); return topic; @@ -102,7 +103,7 @@ public class TopicController { @GetMapping("/{topicName}/dbs") @ResponseBody - @ApiOperation(value="Get all DBs in a topic.") + @ApiOperation(value = "Get all DBs in a topic.") public Set getTopicDbs(@PathVariable("topicName") String topicName) throws IOException { Topic topic = topicService.getTopic(topicName); Set dbs = topic.getDbs(); @@ -113,50 +114,50 @@ public class TopicController { //One exception is that old DBs are kept @PutMapping("/") @ResponseBody - @ApiOperation(value="Update a topic.") + @ApiOperation(value = "Update a topic.") public Topic updateTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException { if (result.hasErrors()) { - sendError(response, 400, "Error parsing Topic: "+result.toString()); - return null; + sendError(response, 400, "Error parsing Topic: " + result.toString()); + return null; } Topic oldTopic = getTopic(topic.getName()); if (oldTopic == null) { - sendError(response, 404, "Topic not found "+topic.getName()); - return null; + sendError(response, 404, "Topic not found " + topic.getName()); + return null; } else { - if(!topic.isDefault()) { + if (!topicService.istDefaultTopic(topic)) { Topic defaultTopic = topicService.getDefaultTopic(); topic.setDefaultTopic(defaultTopic); } - + topic.setDbs(oldTopic.getDbs()); topicRepository.save(topic); return topic; } } - + @PostMapping("/") @ResponseBody - @ApiOperation(value="Create a new topic.") + @ApiOperation(value = "Create a new topic.") public Topic createTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException { - + if (result.hasErrors()) { - sendError(response, 400, "Error parsing Topic: "+result.toString()); + sendError(response, 400, "Error parsing Topic: " + result.toString()); return null; } Topic oldTopic = getTopic(topic.getName()); if (oldTopic != null) { - sendError(response, 400, "Topic already exists "+topic.getName()); + sendError(response, 400, "Topic already exists " + topic.getName()); return null; } else { - if(!topic.isDefault()) { + if (!topicService.istDefaultTopic(topic)) { Topic defaultTopic = topicService.getDefaultTopic(); topic.setDefaultTopic(defaultTopic); } - + topicRepository.save(topic); return topic; } @@ -164,32 +165,32 @@ public class TopicController { @DeleteMapping("/{topicName}/db/{dbName}") @ResponseBody - @ApiOperation(value="Delete a DB from a topic.") + @ApiOperation(value = "Delete a DB from a topic.") public Set deleteDb(@PathVariable("topicName") String topicName, @PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException { Topic topic = topicService.getTopic(topicName); Set dbs = topic.getDbs(); dbs.remove(new Db(dbName)); - + topicRepository.save(topic); - return topic.getDbs(); + return topic.getDbs(); } @PutMapping("/{topicName}/db/{dbName}") @ResponseBody - @ApiOperation(value="Add a DB to a topic.") + @ApiOperation(value = "Add a DB to a topic.") public Set addDb(@PathVariable("topicName") String topicName, @PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException { Topic topic = topicService.getTopic(topicName); - Set dbs = topic.getDbs(); + Set dbs = topic.getDbs(); - Db db = dbService.getDb(dbName); + Db db = dbService.getDb(dbName); dbs.add(db); - + topicRepository.save(topic); - return topic.getDbs(); + return topic.getDbs(); } - + private void sendError(HttpServletResponse response, int sc, String msg) throws IOException { log.info(msg); - response.sendError(sc, msg); + response.sendError(sc, msg); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index b7f89fcd..2b92e869 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -111,10 +111,6 @@ public class Topic { this.name = name; } - public boolean isDefault() { - return "_DL_DEFAULT_".equals(name); - } - public boolean isEnabled() { return is(enabled, Topic::isEnabled); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java index a717e10a..1e5fb78b 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java @@ -27,6 +27,7 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.json.JSONObject; +import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.domain.Topic; import org.slf4j.Logger; @@ -56,6 +57,9 @@ public class CouchbaseService { private final Logger log = LoggerFactory.getLogger(this.getClass()); + @Autowired + ApplicationConfiguration config; + @Autowired private DbService dbService; @@ -73,12 +77,12 @@ public class CouchbaseService { log.info("Connect to Couchbase {}", couchbase.getHost()); // Create a N1QL Primary Index (but ignore if it exists) bucket.bucketManager().createN1qlPrimaryIndex(true, false); + isReady = true; } catch(Exception ex) { isReady = false; } - isReady = true; } @PreDestroy @@ -92,7 +96,7 @@ public class CouchbaseService { //convert to Couchbase JsonObject from org.json JSONObject JsonObject jsonObject = JsonObject.fromJson(json.toString()); - long timestamp = jsonObject.getLong("_ts");//this is Kafka time stamp, which is added in StoreService.messageToJson() + long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson() //setup TTL int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second @@ -102,6 +106,7 @@ public class CouchbaseService { documents.add(doc); } saveDocuments(documents); + log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); } public String getId(Topic topic, JSONObject json) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java index 3c75eb68..e9f36b2d 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java @@ -125,7 +125,7 @@ public class PullThread implements Runnable { List> partitionRecords = records.records(partition); for (ConsumerRecord record : partitionRecords) { messages.add(Pair.of(record.timestamp(), record.value())); - log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); + //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); } storeService.saveMessages(partition.topic(), messages); log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index d9fe12a7..a4f79107 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Map; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -34,6 +34,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.json.JSONException; import org.json.JSONObject; import org.json.XML; +import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.enumeration.DataFormat; import org.slf4j.Logger; @@ -41,16 +42,17 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; - + import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; /** * Service to store messages to varieties of DBs * - * comment out YAML support, since AML is for config and don't see this data type in DMaaP. Do we need to support XML? + * comment out YAML support, since AML is for config and don't see this data + * type in DMaaP. Do we need to support XML? * * @author Guobiao Mo * @@ -59,6 +61,9 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; public class StoreService { private final Logger log = LoggerFactory.getLogger(this.getClass()); + @Autowired + private ApplicationConfiguration config; + @Autowired private TopicService topicService; @@ -71,9 +76,9 @@ public class StoreService { @Autowired private ElasticsearchService elasticsearchService; - private Map topicMap = new HashMap<>(); + private Map topicMap = new HashMap<>(); - private ObjectMapper yamlReader; + private ObjectMapper yamlReader; @PostConstruct private void init() { @@ -112,14 +117,14 @@ public class StoreService { String text = pair.getRight(); //for debug, to be remove -// String topicStr = topic.getId(); -// if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) { - // log.debug("{} ={}", topicStr, text); + // String topicStr = topic.getId(); + // if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) { + // log.debug("{} ={}", topicStr, text); //} boolean storeRaw = topic.isSaveRaw(); - JSONObject json = null; + JSONObject json = null; DataFormat dataFormat = topic.getDataFormat(); @@ -129,7 +134,7 @@ public class StoreService { break; case XML://XML and YAML can be directly inserted into ES, we may not need to convert it to JSON json = XML.toJSONObject(text); - break; + break; case YAML:// Do we need to support YAML? Object obj = yamlReader.readValue(text, Object.class); ObjectMapper jsonWriter = new ObjectMapper(); @@ -145,10 +150,11 @@ public class StoreService { //FIXME for debug, to be remove json.remove("_id"); json.remove("_dl_text_"); + json.remove("_dl_type_"); - json.put("_ts", timestamp); + json.put(config.getTimestampLabel(), timestamp); if (storeRaw) { - json.put("_text", text); + json.put(config.getRawDataLabel(), text); } return json; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index 7e0c7780..ea1eb4c4 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -23,6 +23,7 @@ package org.onap.datalake.feeder.service; import java.io.IOException; import java.util.Optional; +import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.repository.TopicRepository; import org.slf4j.Logger; @@ -41,6 +42,9 @@ public class TopicService { private final Logger log = LoggerFactory.getLogger(this.getClass()); + @Autowired + private ApplicationConfiguration config; + @Autowired private TopicRepository topicRepository; @@ -76,7 +80,13 @@ public class TopicService { } public Topic getDefaultTopic() { - return getTopic("_DL_DEFAULT_"); + return getTopic(config.getDefaultTopicName()); } + public boolean istDefaultTopic(Topic topic) { + if (topic == null) { + return false; + } + return topic.getName().equals(config.getDefaultTopicName()); + } } diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties index 842d01b3..1bd53392 100644 --- a/components/datalake-handler/feeder/src/main/resources/application.properties +++ b/components/datalake-handler/feeder/src/main/resources/application.properties @@ -38,6 +38,13 @@ async=true #SSL global flag, if enabled, still need to check each individual DB SSL flag enableSSL=false +#names for extra fields that DL adds to each record +timestampLabel=datalake_ts_ +rawDataLabel=datalake_text_ + +defaultTopicName=_DL_DEFAULT_ + + #Logging logging.level.org.springframework.web=ERROR logging.level.com.att.nsa.apiClient.http=ERROR diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java index ee3ceb7b..3e7d9986 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java @@ -61,6 +61,9 @@ public class ApplicationConfigurationTest { assertTrue(config.getKafkaConsumerCount() > 0); assertNotNull(config.isAsync()); assertNotNull(config.isEnableSSL()); + assertNotNull(config.getDefaultTopicName()); + assertNotNull(config.getRawDataLabel()); + assertNotNull(config.getTimestampLabel()); } } diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java index afd4503d..8be45d60 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java @@ -98,9 +98,6 @@ public class TopicTest { Topic testTopic = new Topic("test"); testTopic.setDefaultTopic(defaultTopic); - assertTrue(defaultTopic.isDefault()); - assertFalse(testTopic.isDefault()); - assertTrue(testTopic.equals(new Topic("test"))); assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode()); diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java index 9e40a2b1..9e1b2d99 100755 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java @@ -34,6 +34,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; +import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Topic; import java.util.ArrayList; @@ -98,6 +99,8 @@ public class CouchbaseServiceTest { @Test public void testSaveJsonsWithTopicId() { + ApplicationConfiguration appConfig = new ApplicationConfiguration(); + appConfig.setTimestampLabel("datalake_ts_"); String text = "{ data: { data2 : { value : 'hello'}}}"; @@ -106,16 +109,19 @@ public class CouchbaseServiceTest { Topic topic = new Topic("test getMessageId"); topic.setMessageIdPath("/data/data2/value"); List jsons = new ArrayList<>(); - json.put("_ts", 1234); + json.put(appConfig.getTimestampLabel(), 1234); jsons.add(json); CouchbaseService couchbaseService = new CouchbaseService(); couchbaseService.bucket = bucket; + couchbaseService.config = appConfig; couchbaseService.saveJsons(topic, jsons); } @Test public void testSaveJsonsWithOutTopicId() { + ApplicationConfiguration appConfig = new ApplicationConfiguration(); + appConfig.setTimestampLabel("datalake_ts_"); String text = "{ data: { data2 : { value : 'hello'}}}"; @@ -123,10 +129,11 @@ public class CouchbaseServiceTest { Topic topic = new Topic("test getMessageId"); List jsons = new ArrayList<>(); - json.put("_ts", 1234); + json.put(appConfig.getTimestampLabel(), 1234); jsons.add(json); CouchbaseService couchbaseService = new CouchbaseService(); couchbaseService.bucket = bucket; + couchbaseService.config = appConfig; couchbaseService.saveJsons(topic, jsons); } diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java index 0e6db836..99f22398 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java @@ -23,6 +23,7 @@ package org.onap.datalake.feeder.service; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.when; @@ -36,6 +37,7 @@ import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.repository.TopicRepository; @@ -49,6 +51,11 @@ import org.onap.datalake.feeder.repository.TopicRepository; @RunWith(MockitoJUnitRunner.class) public class TopicServiceTest { + static String DEFAULT_TOPIC_NAME = "_DL_DEFAULT_"; + + @Mock + private ApplicationConfiguration config; + @Mock private TopicRepository topicRepository; @@ -74,9 +81,10 @@ public class TopicServiceTest { @Test public void testGetDefaultTopic() { - String name = "_DL_DEFAULT_"; - when(topicRepository.findById(name)).thenReturn(Optional.of(new Topic(name))); - assertEquals(topicService.getDefaultTopic(), new Topic(name)); + when(topicRepository.findById(DEFAULT_TOPIC_NAME)).thenReturn(Optional.of(new Topic(DEFAULT_TOPIC_NAME))); + when(config.getDefaultTopicName()).thenReturn(DEFAULT_TOPIC_NAME); + assertEquals(topicService.getDefaultTopic(), new Topic(DEFAULT_TOPIC_NAME)); + assertTrue(topicService.istDefaultTopic(new Topic(DEFAULT_TOPIC_NAME))); } @Test(expected = IOException.class) diff --git a/components/datalake-handler/feeder/src/test/resources/application.properties b/components/datalake-handler/feeder/src/test/resources/application.properties index d6d98e64..b9077056 100644 --- a/components/datalake-handler/feeder/src/test/resources/application.properties +++ b/components/datalake-handler/feeder/src/test/resources/application.properties @@ -17,6 +17,16 @@ kafkaConsumerCount=1 #tolerate inconsistency when system crash, see PullThread.run() async=true +#SSL global flag, if enabled, still need to check each individual DB SSL flag +enableSSL=false + +#names for extra fields that DL adds to each record +timestampLabel=datalake_ts_ +rawDataLabel=datalake_text_ + +defaultTopicName=_DL_DEFAULT_ + + #Logging logging.level.org.springframework.web=ERROR logging.level.com.att.nsa.apiClient.http=ERROR -- cgit 1.2.3-korg