summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler
diff options
context:
space:
mode:
Diffstat (limited to 'components/datalake-handler')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java168
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/DbConfig.java9
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/PostReturnBody.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/TopicConfig.java59
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java37
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java51
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java62
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java115
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java92
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java14
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java7
11 files changed, 408 insertions, 208 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
index 0869fde7..7583684a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -20,6 +20,7 @@
package org.onap.datalake.feeder.controller;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -27,6 +28,9 @@ import javax.servlet.http.HttpServletResponse;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.controller.domain.PostReturnBody;
+import org.onap.datalake.feeder.controller.domain.TopicConfig;
+import org.onap.datalake.feeder.repository.DbRepository;
import org.onap.datalake.feeder.repository.TopicRepository;
import org.onap.datalake.feeder.service.DbService;
import org.onap.datalake.feeder.service.DmaapService;
@@ -57,11 +61,11 @@ import io.swagger.annotations.ApiOperation;
* script.
*
* @author Guobiao Mo
- *
+ * @contributor Kate Hsuan @ QCT
*/
@RestController
-@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE }) //, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
+@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
public class TopicController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -73,6 +77,9 @@ public class TopicController {
private TopicRepository topicRepository;
@Autowired
+ private DbRepository dbRepository;
+
+ @Autowired
private TopicService topicService;
@Autowired
@@ -85,112 +92,117 @@ public class TopicController {
return dmaapService.getTopics();
}
- @GetMapping("/")
+ @GetMapping("")
@ResponseBody
- @ApiOperation(value = "List all topics' settings.")
- public Iterable<Topic> list() throws IOException {
+ @ApiOperation(value="List all topics")
+ public List<String> list() throws IOException {
Iterable<Topic> ret = topicRepository.findAll();
- return ret;
+ List<String> retString = new ArrayList<>();
+ for(Topic item : ret)
+ {
+ if(!topicService.istDefaultTopic(item))
+ retString.add(item.getName());
+ }
+ return retString;
}
- @GetMapping("/{topicName}")
+ @PostMapping("")
@ResponseBody
- @ApiOperation(value = "Get a topic's settings.")
- public Topic getTopic(@PathVariable("topicName") String topicName) throws IOException {
- Topic topic = topicService.getTopic(topicName);
- return topic;
+ @ApiOperation(value="Create a new topic.")
+ public PostReturnBody<TopicConfig> createTopic(@RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing Topic: "+result.toString());
+ return null;
+ }
+ Topic oldTopic = topicService.getTopic(topicConfig.getName());
+ if (oldTopic != null) {
+ sendError(response, 400, "Topic already exists "+topicConfig.getName());
+ return null;
+ } else {
+ PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
+ Topic wTopic = topicService.fillTopicConfiguration(topicConfig);
+ if(wTopic.getTtl() == 0)
+ wTopic.setTtl(3650);
+ topicRepository.save(wTopic);
+ mkPostReturnBody(retBody, 200, wTopic);
+ return retBody;
+ }
}
- @GetMapping("/{topicName}/dbs")
+ @GetMapping("/{topicName}")
@ResponseBody
- @ApiOperation(value = "Get all DBs in a topic.")
- public Set<Db> getTopicDbs(@PathVariable("topicName") String topicName) throws IOException {
+ @ApiOperation(value="Get a topic's settings.")
+ public TopicConfig getTopic(@PathVariable("topicName") String topicName, HttpServletResponse response) throws IOException {
Topic topic = topicService.getTopic(topicName);
- Set<Db> dbs = topic.getDbs();
- return dbs;
+ if(topic == null) {
+ sendError(response, 404, "Topic not found");
+ }
+ TopicConfig tConfig = new TopicConfig();
+ mkReturnMessage(topic, tConfig);
+ return tConfig;
}
- //This is not a partial update: old topic is wiped out, and new topic is created based on the input json.
+ //This is not a partial update: old topic is wiped out, and new topic is created based on the input json.
//One exception is that old DBs are kept
- @PutMapping("/")
+ @PutMapping("/{topicName}")
@ResponseBody
- @ApiOperation(value = "Update a topic.")
- public Topic updateTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
+ @ApiOperation(value="Update a topic.")
+ public PostReturnBody<TopicConfig> updateTopic(@PathVariable("topicName") String topicName, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException {
if (result.hasErrors()) {
- sendError(response, 400, "Error parsing Topic: " + result.toString());
- return null;
- }
-
- Topic oldTopic = getTopic(topic.getName());
- if (oldTopic == null) {
- sendError(response, 404, "Topic not found " + topic.getName());
+ sendError(response, 400, "Error parsing Topic: "+result.toString());
return null;
- } else {
- if (!topicService.istDefaultTopic(topic)) {
- Topic defaultTopic = topicService.getDefaultTopic();
- topic.setDefaultTopic(defaultTopic);
- }
-
- topic.setDbs(oldTopic.getDbs());
- topicRepository.save(topic);
- return topic;
}
- }
-
- @PostMapping("/")
- @ResponseBody
- @ApiOperation(value = "Create a new topic.")
- public Topic createTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
- if (result.hasErrors()) {
- sendError(response, 400, "Error parsing Topic: " + result.toString());
+ if(!topicName.equals(topicConfig.getName()))
+ {
+ sendError(response, 400, "Topic name mismatch" + topicName + topicConfig.getName());
return null;
}
- Topic oldTopic = getTopic(topic.getName());
- if (oldTopic != null) {
- sendError(response, 400, "Topic already exists " + topic.getName());
+ Topic oldTopic = topicService.getTopic(topicConfig.getName());
+ if (oldTopic == null) {
+ sendError(response, 404, "Topic not found "+topicConfig.getName());
return null;
} else {
- if (!topicService.istDefaultTopic(topic)) {
- Topic defaultTopic = topicService.getDefaultTopic();
- topic.setDefaultTopic(defaultTopic);
- }
-
- topicRepository.save(topic);
- return topic;
+ PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
+ topicService.fillTopicConfiguration(topicConfig, oldTopic);
+ topicRepository.save(oldTopic);
+ mkPostReturnBody(retBody, 200, oldTopic);
+ return retBody;
}
}
- @DeleteMapping("/{topicName}/db/{dbName}")
- @ResponseBody
- @ApiOperation(value = "Delete a DB from a topic.")
- public Set<Db> deleteDb(@PathVariable("topicName") String topicName, @PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException {
- Topic topic = topicService.getTopic(topicName);
- Set<Db> dbs = topic.getDbs();
- dbs.remove(new Db(dbName));
-
- topicRepository.save(topic);
- return topic.getDbs();
+ private void mkReturnMessage(Topic topic, TopicConfig tConfig)
+ {
+ tConfig.setName(topic.getName());
+ tConfig.setEnable(topic.getEnabled());
+ if(topic.getDataFormat() != null)
+ tConfig.setData_format(topic.getDataFormat().toString());
+ tConfig.setSave_raw(topic.getSaveRaw());
+ tConfig.setCorrelated_clearred_message((topic.getCorrelateClearedMessage() == null) ? topic.getCorrelateClearedMessage() : false);
+ tConfig.setMessage_id_path(topic.getMessageIdPath());
+ tConfig.setTtl(topic.getTtl());
+ Set<Db> topicDb = topic.getDbs();
+ List<String> dbList = new ArrayList<>();
+ for(Db item: topicDb)
+ {
+ dbList.add(item.getName());
+ }
+ tConfig.setSinkdbs(dbList);
}
- @PutMapping("/{topicName}/db/{dbName}")
- @ResponseBody
- @ApiOperation(value = "Add a DB to a topic.")
- public Set<Db> addDb(@PathVariable("topicName") String topicName, @PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException {
- Topic topic = topicService.getTopic(topicName);
- Set<Db> dbs = topic.getDbs();
-
- Db db = dbService.getDb(dbName);
- dbs.add(db);
-
- topicRepository.save(topic);
- return topic.getDbs();
+ private void mkPostReturnBody(PostReturnBody<TopicConfig> retBody, int statusCode, Topic topic)
+ {
+ TopicConfig retTopic = new TopicConfig();
+ retBody.setStatusCode(statusCode);
+ mkReturnMessage(topic, retTopic);
+ retBody.setReturnBody(retTopic);
}
-
+
private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
log.info(msg);
- response.sendError(sc, msg);
+ response.sendError(sc, msg);
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/DbConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/DbConfig.java
index 63de2196..e9d9ba8a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/DbConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/DbConfig.java
@@ -19,15 +19,6 @@
*/
package org.onap.datalake.feeder.controller.domain;
-import java.util.Set;
-
-import javax.persistence.CascadeType;
-import javax.persistence.Entity;
-import javax.persistence.Id;
-import javax.persistence.ManyToMany;
-import javax.persistence.Table;
-
-import com.fasterxml.jackson.annotation.JsonBackReference;
import lombok.Getter;
import lombok.Setter;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/PostReturnBody.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/PostReturnBody.java
index 107f494d..84bfdd76 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/PostReturnBody.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/PostReturnBody.java
@@ -25,7 +25,7 @@ import lombok.Setter;
/**
- * Unified POST return format
+ * Unified POST/PUT return format
* {
* statusCode: int,
* message: {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/TopicConfig.java
new file mode 100644
index 00000000..9e538626
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/TopicConfig.java
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 QCT
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.controller.domain;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.repository.DbRepository;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * JSON request body for Topic manipulation.
+ *
+ * @author Kate Hsuan
+ *
+ */
+
+@Getter
+@Setter
+
+public class TopicConfig {
+
+ private String name;
+ private String login;
+ private String password;
+ private List<String> sinkdbs;
+ private boolean enable;
+ private boolean save_raw;
+ private String data_format;
+ private int ttl;
+ private boolean correlated_clearred_message;
+ private String message_id_path;
+
+
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index 2b92e869..4273c896 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -54,14 +54,15 @@ import lombok.Setter;
@Table(name = "topic")
public class Topic {
@Id
+ @Column(name="`name`")
private String name;//topic name
- @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
- @JoinColumn(name = "default_topic", nullable = true)
- private Topic defaultTopic;
- //for protected Kafka topics
+ //for protected Kafka topics
+ @Column(name = "`login`")
private String login;
+
+ @Column(name = "`pass`")
private String pass;
//@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
@@ -77,18 +78,20 @@ public class Topic {
/**
* indicate if we should monitor this topic
*/
+ @Column(name="`enabled`")
private Boolean enabled;
/**
* save raw message text
*/
- @Column(name = "save_raw")
+ @Column(name = "`save_raw`")
private Boolean saveRaw;
/**
* need to explicitly tell feeder the data format of the message.
* support JSON, XML, YAML, TEXT
*/
+ @Column(name="`data_format`")
private String dataFormat;
/**
@@ -97,11 +100,11 @@ public class Topic {
private Integer ttl;
//if this flag is true, need to correlate alarm cleared message to previous alarm
- @Column(name = "correlate_cleared_message")
+ @Column(name = "`correlate_cleared_message`")
private Boolean correlateClearedMessage;
//paths to the values in the JSON that are used to composite DB id, comma separated, example: "/event-header/id,/event-header/entity-type,/entity/product-name"
- @Column(name = "message_id_path")
+ @Column(name = "`message_id_path`")
private String messageIdPath;
public Topic() {
@@ -111,6 +114,10 @@ public class Topic {
this.name = name;
}
+ public boolean isDefault() {
+ return "_DL_DEFAULT_".equals(name);
+ }
+
public boolean isEnabled() {
return is(enabled, Topic::isEnabled);
}
@@ -122,9 +129,7 @@ public class Topic {
public int getTtl() {
if (ttl != null) {
return ttl;
- } else if (defaultTopic != null) {
- return defaultTopic.getTtl();
- } else {
+ } else {
return 3650;//default to 10 years for safe
}
}
@@ -132,9 +137,7 @@ public class Topic {
public DataFormat getDataFormat() {
if (dataFormat != null) {
return DataFormat.fromString(dataFormat);
- } else if (defaultTopic != null) {
- return defaultTopic.getDataFormat();
- } else {
+ } else {
return null;
}
}
@@ -147,9 +150,7 @@ public class Topic {
private boolean is(Boolean b, Predicate<Topic> pre, boolean defaultValue) {
if (b != null) {
return b;
- } else if (defaultTopic != null) {
- return pre.test(defaultTopic);
- } else {
+ } else {
return defaultValue;
}
}
@@ -179,10 +180,6 @@ public class Topic {
if (dbs != null && dbs.contains(db)) {
return true;
- }
-
- if (defaultTopic != null) {
- return defaultTopic.containDb(dbName);
} else {
return false;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
index e9f36b2d..ce671a90 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
@@ -48,7 +48,7 @@ import org.springframework.stereotype.Service;
/**
* Thread that pulls messages from DMaaP and save them to Big Data DBs
- *
+ *
* @author Guobiao Mo
*
*/
@@ -65,11 +65,11 @@ public class PullThread implements Runnable {
@Autowired
private ApplicationConfiguration config;
-
+
private final Logger log = LoggerFactory.getLogger(this.getClass());
private KafkaConsumer<String, String> consumer; //<String, String> is key-value type, in our case key is empty, value is JSON text
- private int id;
+ private int id;
private final AtomicBoolean active = new AtomicBoolean(false);
private boolean async;
@@ -112,33 +112,34 @@ public class PullThread implements Runnable {
List<String> topics = dmaapService.getActiveTopics(); //TODO get updated topic list within loop
log.info("Thread {} going to subscribe to topics: {}", id, topics);
-
+
consumer.subscribe(topics, rebalanceListener);
while (active.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
-
- List<Pair<Long, String>> messages = new ArrayList<>(records.count());
- for (TopicPartition partition : records.partitions()) {
- messages.clear();
- List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
- for (ConsumerRecord<String, String> record : partitionRecords) {
- messages.add(Pair.of(record.timestamp(), record.value()));
- //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
- }
- storeService.saveMessages(partition.topic(), messages);
- log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
-
- if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
- long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
- consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
- }
- }
-
- if (async) {//for high Throughput, async commit offset in batch to Kafka
- consumer.commitAsync();
- }
+ if (records != null) {
+ List<Pair<Long, String>> messages = new ArrayList<>(records.count());
+ for (TopicPartition partition : records.partitions()) {
+ messages.clear();
+ List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+ for (ConsumerRecord<String, String> record : partitionRecords) {
+ messages.add(Pair.of(record.timestamp(), record.value()));
+ //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+ }
+ storeService.saveMessages(partition.topic(), messages);
+ log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
+
+ if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ }
+ }
+
+ if (async) {//for high Throughput, async commit offset in batch to Kafka
+ consumer.commitAsync();
+ }
+ }
}
} catch (Exception e) {
log.error("Puller {} run(): exception={}", id, e.getMessage());
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index ea1eb4c4..3acbaf1d 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -21,10 +21,17 @@
package org.onap.datalake.feeder.service;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Optional;
+import java.util.Set;
+import org.checkerframework.checker.units.qual.A;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.controller.domain.TopicConfig;
+import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.repository.DbRepository;
import org.onap.datalake.feeder.repository.TopicRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +57,10 @@ public class TopicService {
@Autowired
private ElasticsearchService elasticsearchService;
+
+
+ @Autowired
+ private DbRepository dbRepository;
public Topic getEffectiveTopic(String topicStr) {
try {
@@ -59,13 +70,14 @@ public class TopicService {
}
return null;
}
-
+
//TODO caller should not modify the returned topic, maybe return a clone
public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
Topic topic = getTopic(topicStr);
if (topic == null) {
topic = new Topic(topicStr);
- topic.setDefaultTopic(getDefaultTopic());
+ topicRepository.save(topic);
+ //topic.setDefaultTopic(getDefaultTopic());
}
if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) {
@@ -89,4 +101,50 @@ public class TopicService {
}
return topic.getName().equals(config.getDefaultTopicName());
}
+
+ public void fillTopicConfiguration(TopicConfig tConfig, Topic wTopic)
+ {
+ fillTopic(tConfig, wTopic);
+ }
+
+ public Topic fillTopicConfiguration(TopicConfig tConfig)
+ {
+ Topic topic = new Topic();
+ fillTopic(tConfig, topic);
+ return topic;
+ }
+
+ private void fillTopic(TopicConfig tConfig, Topic topic)
+ {
+ Set<Db> relateDb = new HashSet<>();
+ topic.setName(tConfig.getName());
+ topic.setLogin(tConfig.getLogin());
+ topic.setPass(tConfig.getPassword());
+ topic.setEnabled(tConfig.isEnable());
+ topic.setSaveRaw(tConfig.isSave_raw());
+ topic.setTtl(tConfig.getTtl());
+ topic.setCorrelateClearedMessage(tConfig.isCorrelated_clearred_message());
+ topic.setDataFormat(tConfig.getData_format());
+ topic.setMessageIdPath(tConfig.getMessage_id_path());
+
+ if(tConfig.getSinkdbs() != null) {
+ for (String item : tConfig.getSinkdbs()) {
+ Db sinkdb = dbRepository.findByName(item);
+ if (sinkdb != null) {
+ relateDb.add(sinkdb);
+ }
+ }
+ if(relateDb.size() > 0)
+ topic.setDbs(relateDb);
+ else if(relateDb.size() == 0)
+ {
+ topic.getDbs().clear();
+ }
+ }else
+ {
+ topic.setDbs(relateDb);
+ }
+
+ }
+
}
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java
new file mode 100644
index 00000000..713d8b19
--- /dev/null
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java
@@ -0,0 +1,115 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : DATALAKE
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.datalake.feeder.controller;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.service.DmaapService;
+import org.onap.datalake.feeder.service.PullService;
+import org.onap.datalake.feeder.service.PullThread;
+import org.springframework.context.ApplicationContext;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+
+public class FeederControllerTest {
+
+ @InjectMocks
+ private PullService pullService1;
+
+ @Mock
+ private ApplicationConfiguration config;
+
+ @Mock
+ private ApplicationContext context;
+
+ @Mock
+ private DmaapService dmaapService1;
+
+ @Mock
+ private KafkaConsumer<String, String> kafkaConsumer;
+
+ @Before
+ public void setupTest() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ private void setAccessPrivateFields(FeederController feederController) throws NoSuchFieldException,
+ IllegalAccessException {
+ Field pullService = feederController.getClass().getDeclaredField("pullService");
+ pullService.setAccessible(true);
+ pullService.set(feederController, pullService1);
+ }
+
+ @Test
+ public void testStart() throws IOException, NoSuchFieldException, IllegalAccessException {
+ FeederController feederController = new FeederController();
+ setAccessPrivateFields(feederController);
+ PullService pullService2 = new PullService();
+ Field applicationConfig = pullService2.getClass().getDeclaredField("config");
+ applicationConfig.setAccessible(true);
+ applicationConfig.set(pullService2, config);
+ Field applicationContext = pullService2.getClass().getDeclaredField("context");
+ applicationContext.setAccessible(true);
+ applicationContext.set(pullService2, context);
+ when(config.getKafkaConsumerCount()).thenReturn(1);
+ PullThread pullThread = new PullThread(1);
+ Field dmaapService = pullThread.getClass().getDeclaredField("dmaapService");
+ dmaapService.setAccessible(true);
+ dmaapService.set(pullThread, dmaapService1);
+ Field kafkaConsumer1 = pullThread.getClass().getDeclaredField("consumer");
+ kafkaConsumer1.setAccessible(true);
+ kafkaConsumer1.set(pullThread, kafkaConsumer);
+ applicationConfig = pullThread.getClass().getDeclaredField("config");
+ applicationConfig.setAccessible(true);
+ applicationConfig.set(pullThread, config);
+ when(context.getBean(PullThread.class, 0)).thenReturn(pullThread);
+ ConsumerRecords<String, String> records = ConsumerRecords.empty();
+ when(kafkaConsumer.poll(2)).thenReturn(records);
+ String start = feederController.start();
+ assertEquals("DataLake feeder is running.", start);
+ }
+
+ @Test
+ public void testStop() throws NoSuchFieldException, IllegalAccessException {
+ FeederController feederController = new FeederController();
+ setAccessPrivateFields(feederController);
+ String stop = feederController.stop();
+ assertEquals("DataLake feeder is stopped.", stop);
+ }
+
+ @Test
+ public void testStatus() throws NoSuchFieldException, IllegalAccessException {
+ FeederController feederController = new FeederController();
+ setAccessPrivateFields(feederController);
+ String status = feederController.status();
+ assertEquals("Feeder is running: false", status);
+ }
+}
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java
index a770f50f..775bcc3b 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java
@@ -27,6 +27,8 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnitRunner;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.controller.domain.PostReturnBody;
+import org.onap.datalake.feeder.controller.domain.TopicConfig;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.TopicRepository;
@@ -61,6 +63,10 @@ public class TopicControllerTest {
@Mock
private TopicRepository topicRepository;
+ @Mock
+
+ private TopicService topicServiceMock;
+
@InjectMocks
private TopicService topicService1;
@@ -95,79 +101,51 @@ public class TopicControllerTest {
}
@Test
+ public void testListTopic() throws IOException, NoSuchFieldException, IllegalAccessException{
+ TopicController topicController = new TopicController();
+ setAccessPrivateFields(topicController);
+ }
+
+ @Test
public void testCreateTopic() throws IOException, NoSuchFieldException, IllegalAccessException {
TopicController topicController = new TopicController();
setAccessPrivateFields(topicController);
- when(topicRepository.findById(DEFAULT_TOPIC_NAME)).thenReturn(Optional.of(new Topic(DEFAULT_TOPIC_NAME)));
- when(config.getDefaultTopicName()).thenReturn(DEFAULT_TOPIC_NAME);
- Topic topicName = topicController.createTopic(new Topic("a"), mockBindingResult, httpServletResponse);
- assertEquals(new Topic("a"), topicName);
+ //when(topicRepository.findById("ab")).thenReturn(Optional.of(new Topic("ab")));
+ // when(config.getDefaultTopicName()).thenReturn(DEFAULT_TOPIC_NAME);
+ PostReturnBody<TopicConfig> postTopic = topicController.createTopic(new TopicConfig(), mockBindingResult, httpServletResponse);
+ assertEquals(postTopic.getStatusCode(), 200);
when(mockBindingResult.hasErrors()).thenReturn(true);
- topicName = topicController.createTopic(new Topic("a"), mockBindingResult, httpServletResponse);
- assertEquals(null, topicName);
+ PostReturnBody<TopicConfig> topicConfig= topicController.createTopic(new TopicConfig(), mockBindingResult, httpServletResponse);
+ assertEquals(null, topicConfig);
when(mockBindingResult.hasErrors()).thenReturn(false);
- Topic a = new Topic("a");
- a.setName("a");
- when(topicRepository.findById("a")).thenReturn(Optional.of(a));
- topicName = topicController.createTopic(new Topic("a"), mockBindingResult, httpServletResponse);
- assertEquals(null, topicName);
+ TopicConfig a = new TopicConfig();
+ a.setName(DEFAULT_TOPIC_NAME);
+ when(topicRepository.findById(DEFAULT_TOPIC_NAME)).thenReturn(Optional.of(new Topic(DEFAULT_TOPIC_NAME)));
+ PostReturnBody<TopicConfig> postTopic2= topicController.createTopic(a, mockBindingResult, httpServletResponse);
+ assertEquals(null, postTopic2);
}
@Test
public void testUpdateTopic() throws IOException, NoSuchFieldException, IllegalAccessException {
TopicController topicController = new TopicController();
setAccessPrivateFields(topicController);
- Topic topicName = topicController.updateTopic(new Topic("a"), mockBindingResult, httpServletResponse);
- assertEquals(null, topicName);
+ PostReturnBody<TopicConfig> postTopic = topicController.updateTopic("a", new TopicConfig(), mockBindingResult, httpServletResponse);
+ assertEquals(null, postTopic);
Topic a = new Topic("a");
a.setName("a");
when(topicRepository.findById("a")).thenReturn(Optional.of(a));
- topicName = topicController.updateTopic(new Topic("a"), mockBindingResult, httpServletResponse);
- assertEquals(new Topic("a"), topicName);
+ TopicConfig ac = new TopicConfig();
+ ac.setName("a");
+ ac.setEnable(true);
+ PostReturnBody<TopicConfig> postConfig1 = topicController.updateTopic("a", ac, mockBindingResult, httpServletResponse);
+ assertEquals(200, postConfig1.getStatusCode());
+ TopicConfig ret = postConfig1.getReturnBody();
+ assertEquals("a", ret.getName());
+ assertEquals(true, ret.isEnable());
when(mockBindingResult.hasErrors()).thenReturn(true);
- topicName = topicController.updateTopic(new Topic("a"), mockBindingResult, httpServletResponse);
- assertEquals(null, topicName);
-
- ArrayList<Topic> topics = new ArrayList<>();
- topics.add(a);
- when(topicRepository.findAll()).thenReturn(topics);
- Iterable<Topic> list = topicController.list();
- for (Topic newTopic : list) {
- assertEquals(a, newTopic);
- }
- }
+ PostReturnBody<TopicConfig> postConfig2 = topicController.updateTopic("a", ac, mockBindingResult, httpServletResponse);
+ assertEquals(null, postConfig2);
- @Test
- public void testAddDb() throws NoSuchFieldException, IllegalAccessException, IOException {
- TopicController topicController = new TopicController();
- setAccessPrivateFields(topicController);
- String dbName = "Elecsticsearch";
- String name = "a";
- Topic topic = new Topic(name);
- topic.setEnabled(true);
- Set<Db> dbSet = new HashSet<>();
- dbSet.add(new Db(dbName));
- topic.setDbs(dbSet);
-
- when(topicRepository.findById(name)).thenReturn(Optional.of(topic));
- topicController.addDb("a", dbName, httpServletResponse);
- topicController.deleteDb("a", dbName, httpServletResponse);
- }
-
- @Test
- public void testGetTopicDbs() throws NoSuchFieldException, IllegalAccessException, IOException {
- TopicController topicController = new TopicController();
- setAccessPrivateFields(topicController);
- String dbName = "Elecsticsearch";
- String name = "a";
- Topic topic = new Topic(name);
- topic.setEnabled(true);
- Set<Db> dbSet = new HashSet<>();
- dbSet.add(new Db(dbName));
- topic.setDbs(dbSet);
-
- when(topicRepository.findById(name)).thenReturn(Optional.of(topic));
- topicController.getTopicDbs("a");
}
@Test
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
index 8be45d60..b583473a 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
@@ -69,19 +69,16 @@ public class TopicTest {
Topic defaultTopic = new Topic("_DL_DEFAULT_");
Topic testTopic = new Topic("test");
- testTopic.setDefaultTopic(defaultTopic);
assertEquals(3650, testTopic.getTtl());
defaultTopic.setTtl(20);
- assertEquals(20, testTopic.getTtl());
- topic.setDefaultTopic(new Topic("defaultTopic"));
+ assertEquals(20, defaultTopic.getTtl());
topic.setLogin("root");
topic.setPass("root123");
topic.setEnabled(true);
topic.setSaveRaw(true);
topic.setCorrelateClearedMessage(true);
topic.setMessageIdPath("/data/data2/value");
- assertTrue("defaultTopic".equals(topic.getDefaultTopic().toString()));
assertTrue("root".equals(topic.getLogin()));
assertTrue("root123".equals(topic.getPass()));
assertFalse("true".equals(topic.getEnabled()));
@@ -96,14 +93,13 @@ public class TopicTest {
public void testIs() {
Topic defaultTopic = new Topic("_DL_DEFAULT_");
Topic testTopic = new Topic("test");
- testTopic.setDefaultTopic(defaultTopic);
assertTrue(testTopic.equals(new Topic("test")));
assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode());
defaultTopic.setDbs(new HashSet<>());
defaultTopic.getDbs().add(new Db("Elasticsearch"));
- assertTrue(testTopic.supportElasticsearch());
+ assertTrue(defaultTopic.supportElasticsearch());
assertFalse(testTopic.supportCouchbase());
assertFalse(testTopic.supportDruid());
assertFalse(testTopic.supportMongoDB());
@@ -116,9 +112,9 @@ public class TopicTest {
defaultTopic.setDataFormat("XML");
defaultTopic.setEnabled(true);
defaultTopic.setSaveRaw(true);
- assertTrue(testTopic.isCorrelateClearedMessage());
- assertTrue(testTopic.isEnabled());
- assertTrue(testTopic.isSaveRaw());
+ assertTrue(defaultTopic.isCorrelateClearedMessage());
+ assertTrue(defaultTopic.isEnabled());
+ assertTrue(defaultTopic.isSaveRaw());
assertEquals(defaultTopic.getDataFormat(), DataFormat.XML);
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
index 99f22398..8b25ec53 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
@@ -79,13 +79,6 @@ public class TopicServiceTest {
assertNull(topicService.getTopic(name));
}
- @Test
- public void testGetDefaultTopic() {
- when(topicRepository.findById(DEFAULT_TOPIC_NAME)).thenReturn(Optional.of(new Topic(DEFAULT_TOPIC_NAME)));
- when(config.getDefaultTopicName()).thenReturn(DEFAULT_TOPIC_NAME);
- assertEquals(topicService.getDefaultTopic(), new Topic(DEFAULT_TOPIC_NAME));
- assertTrue(topicService.istDefaultTopic(new Topic(DEFAULT_TOPIC_NAME)));
- }
@Test(expected = IOException.class)
public void testGetEffectiveTopic() throws IOException {