summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-04-06 21:52:09 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-04-07 01:37:31 -0700
commit8dc9d71a2465f5c1e4beb52c2375efe02bcde174 (patch)
treefa2602aca45f7222d21a38800268bcf90db2fd39 /components/datalake-handler/feeder/src/main/java/org
parent3cb79e621ef9982d039d3770fbe02a0bed208481 (diff)
Use MariaDB to store application configurations
Issue-ID: DCAEGEN2-1400 Change-Id: I86b5bc25d84b98f7ac84b95f1690089dcebe7f0a Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java20
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java135
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java)13
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java114
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java83
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java130
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java)14
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java39
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java67
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java30
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java67
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java39
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java12
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java10
15 files changed, 538 insertions, 239 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 108eb4e0..1136e304 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -20,13 +20,9 @@
package org.onap.datalake.feeder.config;
-import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.couchbase.CouchbaseConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.FilterType;
-import org.springframework.context.annotation.ComponentScan;
import lombok.Getter;
import lombok.Setter;
@@ -42,34 +38,18 @@ import lombok.Setter;
@Setter
@SpringBootConfiguration
@ConfigurationProperties
-//@ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = CouchbaseConfiguration.class))
-//https://stackoverflow.com/questions/29344313/prevent-application-commandlinerunner-classes-from-executing-during-junit-test
@EnableAutoConfiguration
-//@Profile("test")
public class ApplicationConfiguration {
- private String couchbaseHost;
- private String couchbaseUser;
- private String couchbasePass;
- private String couchbaseBucket;
-
- // private int mongodbPort;
- // private String mongodbDatabase;
-
private String dmaapZookeeperHostPort;
private String dmaapKafkaHostPort;
private String dmaapKafkaGroup;
private long dmaapKafkaTimeout;
-// private boolean dmaapMonitorAllTopics;
private int dmaapCheckNewTopicIntervalInSec;
- //private String dmaapHostPort;
- //private Set<String> dmaapExcludeTopics;
- //private Set<String> dmaapIncludeTopics;
private int kafkaConsumerCount;
private boolean async;
- private String elasticsearchHost;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
new file mode 100644
index 00000000..c34befcc
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
@@ -0,0 +1,135 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.controller;
+
+import java.io.IOException;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.repository.DbRepository;
+import org.onap.datalake.feeder.service.DbService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * This controller manages the big data storage settings. All the settings are saved in database.
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+@RestController
+@RequestMapping(value = "/dbs", produces = { MediaType.APPLICATION_JSON_VALUE })
+public class DbController {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private DbRepository dbRepository;
+
+ @Autowired
+ private DbService dbService;
+
+ //list all dbs
+ @GetMapping("/")
+ @ResponseBody
+ public Iterable<Db> list() throws IOException {
+ Iterable<Db> ret = dbRepository.findAll();
+ return ret;
+ }
+
+ //Read a db
+ //the topics are missing in the return, since in we use @JsonBackReference on Db's topics
+ //need to the the following method to retrieve the topic list
+ @GetMapping("/{name}")
+ @ResponseBody
+ public Db getDb(@PathVariable("name") String dbName) throws IOException {
+ Db db = dbService.getDb(dbName);
+ return db;
+ }
+
+ //Read topics in a DB
+ @GetMapping("/{name}/topics")
+ @ResponseBody
+ public Set<Topic> getDbTopics(@PathVariable("name") String dbName) throws IOException {
+ Db db = dbService.getDb(dbName);
+ Set<Topic> topics = db.getTopics();
+ return topics;
+ }
+
+ //Update Db
+ @PutMapping("/")
+ @ResponseBody
+ public Db updateDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing DB: "+result.toString());
+ return null;
+ }
+
+ Db oldDb = getDb(db.getName());
+ if (oldDb == null) {
+ sendError(response, 404, "Db not found: "+db.getName());
+ return null;
+ } else {
+ dbRepository.save(db);
+ return db;
+ }
+ }
+
+ //create a new Db
+ @PostMapping("/")
+ @ResponseBody
+ public Db createDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing DB: "+result.toString());
+ return null;
+ }
+
+ Db oldDb = getDb(db.getName());
+ if (oldDb != null) {
+ sendError(response, 400, "Db already exists: "+db.getName());
+ return null;
+ } else {
+ dbRepository.save(db);
+ return db;
+ }
+ }
+
+ private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
+ log.info(msg);
+ response.sendError(sc, msg);
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
index 2b176370..2e13e1af 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@@ -37,8 +38,8 @@ import org.springframework.web.bind.annotation.RestController;
*/
@RestController
-@RequestMapping(value = "/pull", produces = { MediaType.TEXT_PLAIN_VALUE })
-public class PullController {
+@RequestMapping(value = "/feeder", produces = { MediaType.TEXT_PLAIN_VALUE })
+public class FeederController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -49,7 +50,7 @@ public class PullController {
* @return message that application is started
* @throws IOException
*/
- @RequestMapping("/start")
+ @GetMapping("/start")
public String start() throws IOException {
log.info("DataLake feeder starting to pull data from DMaaP...");
pullService.start();
@@ -59,7 +60,7 @@ public class PullController {
/**
* @return message that application stop process is triggered
*/
- @RequestMapping("/stop")
+ @GetMapping("/stop")
public String stop() {
pullService.shutdown();
log.info("DataLake feeder is stopped.");
@@ -68,9 +69,9 @@ public class PullController {
/**
* @return feeder status
*/
- @RequestMapping("/status")
+ @GetMapping("/status")
public String status() {
- String status = "to be impletemented";
+ String status = "Feeder is running: "+pullService.isRunning();
log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc.
return status;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
index 25028d58..c4aec14c 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -21,36 +21,44 @@ package org.onap.datalake.feeder.controller;
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
+import java.util.Set;
+import javax.servlet.http.HttpServletResponse;
+
+import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.TopicRepository;
+import org.onap.datalake.feeder.service.DbService;
import org.onap.datalake.feeder.service.DmaapService;
+import org.onap.datalake.feeder.service.TopicService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
/**
- * This controller manages all the topic settings. Topic "_DL_DEFAULT_" acts as
- * the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is
- * used for that topic. All the settings are saved in Couchbase. topic
- * "_DL_DEFAULT_" is populated at setup by a DB script.
+ * This controller manages topic settings.
+ *
+ * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is used for that topic.
+ * All the settings are saved in database.
+ * topic "_DL_DEFAULT_" is populated at setup by a DB script.
*
* @author Guobiao Mo
*
*/
@RestController
-@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })
+@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
public class TopicController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -60,7 +68,13 @@ public class TopicController {
@Autowired
private TopicRepository topicRepository;
+
+ @Autowired
+ private TopicService topicService;
+ @Autowired
+ private DbService dbService;
+
//list all topics in DMaaP
@GetMapping("/dmaap/")
@ResponseBody
@@ -77,33 +91,45 @@ public class TopicController {
}
//Read a topic
- @GetMapping("/{name}")
+ @GetMapping("/{topicname}")
@ResponseBody
- public Topic getTopic(@PathVariable("name") String topicName) throws IOException {
- //Topic topic = topicRepository.findFirstById(topicName);
- Optional<Topic> topic = topicRepository.findById(topicName);
- if (topic.isPresent()) {
- return topic.get();
- } else {
- return null;
- }
+ public Topic getTopic(@PathVariable("topicname") String topicName) throws IOException {
+ Topic topic = topicService.getTopic(topicName);
+ return topic;
+ }
+
+ //Read DBs in a topic
+ @GetMapping("/{topicname}/dbs")
+ @ResponseBody
+ public Set<Db> getTopicDbs(@PathVariable("topicname") String topicName) throws IOException {
+ Topic topic = topicService.getTopic(topicName);
+ Set<Db> dbs = topic.getDbs();
+ return dbs;
}
//Update Topic
+ //This is not a partial update: old topic is wiped out, and new topic is created base on the input json.
+ //One exception is that old DBs are kept
@PutMapping("/")
@ResponseBody
- public Topic updateTopic(Topic topic, BindingResult result) throws IOException {
+ public Topic updateTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
if (result.hasErrors()) {
- log.error(result.toString());
-
- return null;//TODO return binding error
+ sendError(response, 400, "Error parsing Topic: "+result.toString());
+ return null;
}
- Topic oldTopic = getTopic(topic.getId());
+ Topic oldTopic = getTopic(topic.getName());
if (oldTopic == null) {
- return null;//TODO return not found error
+ sendError(response, 404, "Topic not found "+topic.getName());
+ return null;
} else {
+ if(!topic.isDefault()) {
+ Topic defaultTopic = topicService.getDefaultTopic();
+ topic.setDefaultTopic(defaultTopic);
+ }
+
+ topic.setDbs(oldTopic.getDbs());
topicRepository.save(topic);
return topic;
}
@@ -112,20 +138,56 @@ public class TopicController {
//create a new Topic
@PostMapping("/")
@ResponseBody
- public Topic createTopic(Topic topic, BindingResult result) throws IOException {
-
+ public Topic createTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
+
if (result.hasErrors()) {
- log.error(result.toString());
+ sendError(response, 400, "Error parsing Topic: "+result.toString());
return null;
}
- Topic oldTopic = getTopic(topic.getId());
+ Topic oldTopic = getTopic(topic.getName());
if (oldTopic != null) {
- return null;//TODO return 'already exists' error
+ sendError(response, 400, "Topic already exists "+topic.getName());
+ return null;
} else {
+ if(!topic.isDefault()) {
+ Topic defaultTopic = topicService.getDefaultTopic();
+ topic.setDefaultTopic(defaultTopic);
+ }
+
topicRepository.save(topic);
return topic;
}
}
+ //delete a db from the topic
+ @DeleteMapping("/{topicname}/db/{dbname}")
+ @ResponseBody
+ public Set<Db> deleteDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException {
+ Topic topic = topicService.getTopic(topicName);
+ Set<Db> dbs = topic.getDbs();
+ dbs.remove(new Db(dbName));
+
+ topicRepository.save(topic);
+ return topic.getDbs();
+ }
+
+ //add a db to the topic
+ @PutMapping("/{topicname}/db/{dbname}")
+ @ResponseBody
+ public Set<Db> addDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException {
+ Topic topic = topicService.getTopic(topicName);
+ Set<Db> dbs = topic.getDbs();
+
+ Db db = dbService.getDb(dbName);
+ dbs.add(db);
+
+ topicRepository.save(topic);
+ return topic.getDbs();
+ }
+
+ private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
+ log.info(msg);
+ response.sendError(sc, msg);
+ }
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
new file mode 100644
index 00000000..bbaedadc
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
@@ -0,0 +1,83 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+import java.util.Set;
+
+import javax.persistence.CascadeType;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.ManyToMany;
+import javax.persistence.Table;
+
+import com.fasterxml.jackson.annotation.JsonBackReference;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Domain class representing bid data storage
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Setter
+@Getter
+@Entity
+@Table(name = "db")
+public class Db {
+ @Id
+ private String name;
+
+ private String host;
+ private String login;
+ private String pass;
+
+ private String property1;
+ private String property2;
+ private String property3;
+
+ @JsonBackReference
+ @ManyToMany(mappedBy = "dbs", cascade=CascadeType.ALL)
+ /*
+ @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER)
+ @JoinTable( name = "map_db_topic",
+ joinColumns = { @JoinColumn(name="db_name") },
+ inverseJoinColumns = { @JoinColumn(name="topic_name") }
+ ) */
+ protected Set<Topic> topics;
+
+ public Db() {
+ }
+
+ public Db(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return name.equals(((Db)obj).getName());
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index ace33dcc..e1da4d4d 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -19,39 +19,61 @@
*/
package org.onap.datalake.feeder.domain;
+import java.util.Set;
import java.util.function.Predicate;
-import javax.validation.constraints.NotNull;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.JoinTable;
+import javax.persistence.ManyToMany;
+import javax.persistence.ManyToOne;
+import javax.persistence.Table;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
import org.onap.datalake.feeder.enumeration.DataFormat;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.annotation.Transient;
-import org.springframework.data.couchbase.core.mapping.Document;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+
+import lombok.Getter;
import lombok.Setter;
/**
- * Domain class representing topic table in Couchbase
+ * Domain class representing topic
*
* @author Guobiao Mo
*
*/
-@Document
@Setter
+@Getter
+@Entity
+@Table(name = "topic")
public class Topic {
- @NotNull
@Id
- private String id;//topic name
+ private String name;//topic name
- @Transient
+ @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
+ @JoinColumn(name = "default_topic", nullable = true)
private Topic defaultTopic;
//for protected Kafka topics
private String login;
private String pass;
+ //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
+ @JsonBackReference
+ //@JsonManagedReference
+ @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER)
+ @JoinTable( name = "map_db_topic",
+ joinColumns = { @JoinColumn(name="topic_name") },
+ inverseJoinColumns = { @JoinColumn(name="db_name") }
+ )
+ protected Set<Db> dbs;
+
/**
* indicate if we should monitor this topic
*/
@@ -60,20 +82,14 @@ public class Topic {
/**
* save raw message text
*/
+ @Column(name = "save_raw")
private Boolean saveRaw;
/**
- * true: save it to Elasticsearch false: don't save null: use default
- */
- private Boolean supportElasticsearch;
- private Boolean supportCouchbase;
- private Boolean supportDruid;
-
- /**
- * need to explicitly tell feeder the data format of the message
+ * need to explicitly tell feeder the data format of the message.
* support JSON, XML, YAML, TEXT
*/
- private DataFormat dataFormat;
+ private String dataFormat;
/**
* TTL in day
@@ -81,26 +97,24 @@ public class Topic {
private Integer ttl;
//if this flag is true, need to correlate alarm cleared message to previous alarm
+ @Column(name = "correlate_cleared_message")
private Boolean correlateClearedMessage;
//the value in the JSON with this path will be used as DB id
+ @Column(name = "message_id_path")
private String messageIdPath;
public Topic() {
}
- public Topic(String id) {
- this.id = id;
+ public Topic(String name) {
+ this.name = name;
}
- public String getId() {
- return id;
+ public boolean isDefault() {
+ return "_DL_DEFAULT_".equals(name);
}
- public void setDefaultTopic(Topic defaultTopic) {
- this.defaultTopic = defaultTopic;
- }
-
public boolean isEnabled() {
return is(enabled, Topic::isEnabled);
}
@@ -121,7 +135,7 @@ public class Topic {
public DataFormat getDataFormat() {
if (dataFormat != null) {
- return dataFormat;
+ return DataFormat.fromString(dataFormat);
} else if (defaultTopic != null) {
return defaultTopic.getDataFormat();
} else {
@@ -148,24 +162,51 @@ public class Topic {
return is(saveRaw, Topic::isSaveRaw);
}
- public boolean isSupportElasticsearch() {
- return is(supportElasticsearch, Topic::isSupportElasticsearch);
+ public boolean supportElasticsearch() {
+ return containDb("Elasticsearch");//TODO string hard codes
+ }
+
+ public boolean supportCouchbase() {
+ return containDb("Couchbase");
}
- public boolean isSupportCouchbase() {
- return is(supportCouchbase, Topic::isSupportCouchbase);
+ public boolean supportDruid() {
+ return containDb("Druid");
}
- public boolean isSupportDruid() {
- return is(supportDruid, Topic::isSupportDruid);
+ public boolean supportMongoDB() {
+ return containDb("MongoDB");
}
- //extract DB id from a JSON attribute, TODO support multiple attributes
+ private boolean containDb(String dbName) {
+ Db db = new Db(dbName);
+
+ if(dbs!=null && dbs.contains(db)) {
+ return true;
+ }
+
+ if (defaultTopic != null) {
+ return defaultTopic.containDb(dbName);
+ } else {
+ return false;
+ }
+ }
+
+ //extract DB id from JSON attributes, support multiple attributes
public String getMessageId(JSONObject json) {
String id = null;
if(StringUtils.isNotBlank(messageIdPath)) {
- id = json.query(messageIdPath).toString();
+ String[] paths=messageIdPath.split(",");
+
+ StringBuilder sb= new StringBuilder();
+ for(int i=0; i<paths.length; i++) {
+ if(i>0) {
+ sb.append('^');
+ }
+ sb.append(json.query(paths[i]).toString());
+ }
+ id = sb.toString();
}
return id;
@@ -173,20 +214,17 @@ public class Topic {
@Override
public String toString() {
- return id;
+ return name;
}
- /**
- * @return the messageIdPath
- */
- public String getMessageIdPath() {
- return messageIdPath;
+ @Override
+ public boolean equals(Object obj) {
+ return name.equals(((Topic)obj).getName());
}
- /**
- * @param messageIdPath the messageIdPath to set
- */
- public void setMessageIdPath(String messageIdPath) {
- this.messageIdPath = messageIdPath;
+ @Override
+ public int hashCode() {
+ return name.hashCode();
}
+
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java
index 220a8f76..ae03f469 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java
@@ -19,10 +19,18 @@
*/
package org.onap.datalake.feeder.repository;
+import org.onap.datalake.feeder.domain.Db;
+
+import org.springframework.data.repository.CrudRepository;
+
/**
+ *
+ * Db Repository
+ *
* @author Guobiao Mo
*
- */
-public interface TopicRepositoryCustom {
- long updateTopic(String topic, Boolean state);
+ */
+
+public interface DbRepository extends CrudRepository<Db, String> {
+
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
index 37d1a669..2d9adef8 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
@@ -20,48 +20,17 @@
package org.onap.datalake.feeder.repository;
import org.onap.datalake.feeder.domain.Topic;
-import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed;
-import org.springframework.data.couchbase.core.query.Query;
-import org.springframework.data.couchbase.core.query.ViewIndexed;
-import org.springframework.data.couchbase.repository.CouchbasePagingAndSortingRepository;
-import org.springframework.data.domain.Page;
-import org.springframework.data.domain.Pageable;
-
-import java.util.List;
+import org.springframework.data.repository.CrudRepository;
/**
*
- * Topic Repository interface, implementation is taken care by Spring framework.
- * Customization is done through TopicRepositoryCustom and its implementation TopicRepositoryImpl.
+ * Topic Repository
*
* @author Guobiao Mo
*
- */
-@ViewIndexed(designDoc = "topic", viewName = "all")
-public interface TopicRepository extends CouchbasePagingAndSortingRepository<Topic, String>, TopicRepositoryCustom {
-/*
- Topic findFirstById(String topic);
-
- Topic findByIdAndState(String topic, boolean state);
-
- //Supports native JSON query string
- @Query("{topic:'?0'}")
- Topic findTopicById(String topic);
-
- @Query("{topic: { $regex: ?0 } })")
- List<Topic> findTopicByRegExId(String topic);
-
-
- //Page<Topic> findByCompanyIdAndNameLikeOrderByName(String companyId, String name, Pageable pageable);
-
- @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} and companyId = $1 and $2 within #{#n1ql.bucket}")
- Topic findByCompanyAndAreaId(String companyId, String areaId);
+ */
- @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} AND ANY phone IN phoneNumbers SATISFIES phone = $1 END")
- List<Topic> findByPhoneNumber(String telephoneNumber);
+public interface TopicRepository extends CrudRepository<Topic, String> {
- @Query("SELECT COUNT(*) AS count FROM #{#n1ql.bucket} WHERE #{#n1ql.filter} and companyId = $1")
- Long countBuildings(String companyId);
- */
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java
deleted file mode 100644
index 018d5b95..00000000
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DataLake
-* ================================================================================
-* Copyright 2019 China Mobile
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.datalake.feeder.repository;
-
-import org.onap.datalake.feeder.domain.Topic;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.couchbase.core.CouchbaseTemplate;
-/*
-import org.springframework.data.mongodb.MongoDbFactory;
-import org.springframework.data.mongodb.core.MongoTemplate;
-import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;
-import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;
-import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
-import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
-import org.springframework.data.mongodb.core.query.Criteria;
-import org.springframework.data.mongodb.core.query.Query;
-import org.springframework.data.mongodb.core.query.Update;
-
-import com.mongodb.WriteResult;
-import com.mongodb.client.result.UpdateResult;
-*/
-import java.util.List;
-
-/**
- * @author Guobiao Mo
- *
- */
-public class TopicRepositoryImpl implements TopicRepositoryCustom {
-
- @Autowired
- CouchbaseTemplate template;
-
- @Override
- public long updateTopic(String topic, Boolean state) {
-/*
- Query query = new Query(Criteria.where("id").is(topic));
- Update update = new Update();
- update.set("state", state);
-
- UpdateResult result = mongoTemplate.updateFirst(query, update, Topic.class);
-
- if(result!=null)
- return result.getModifiedCount();
- else
- */ return 0L;
-
-
-
- }
-}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
index 35432587..f74829e1 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
@@ -27,7 +27,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.json.JSONObject;
-import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,18 +57,20 @@ public class CouchbaseService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
- private ApplicationConfiguration config;
-
+ private DbService dbService;
+
Bucket bucket;
@PostConstruct
private void init() {
// Initialize Couchbase Connection
- Cluster cluster = CouchbaseCluster.create(config.getCouchbaseHost());
- cluster.authenticate(config.getCouchbaseUser(), config.getCouchbasePass());
- bucket = cluster.openBucket(config.getCouchbaseBucket());
+
+ Db couchbase = dbService.getCouchbase();
+ Cluster cluster = CouchbaseCluster.create(couchbase.getHost());
+ cluster.authenticate(couchbase.getLogin(), couchbase.getPass());
+ bucket = cluster.openBucket(couchbase.getProperty1());
- log.info("Connect to Couchbase " + config.getCouchbaseHost());
+ log.info("Connect to Couchbase " + couchbase.getHost());
// Create a N1QL Primary Index (but ignore if it exists)
bucket.bucketManager().createN1qlPrimaryIndex(true, false);
@@ -90,15 +92,21 @@ public class CouchbaseService {
//setup TTL
int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second
- String id = getId(topic.getId());
+ String id = getId(topic, json);
JsonDocument doc = JsonDocument.create(id, expiry, jsonObject);
documents.add(doc);
}
saveDocuments(documents);
}
-
- private String getId(String topicStr) {
+ public String getId(Topic topic, JSONObject json) {
+ //if this topic requires extract id from JSON
+ String id = topic.getMessageId(json);
+ if(id != null) {
+ return id;
+ }
+
+ String topicStr= topic.getName();
//String id = topicStr+":"+timestamp+":"+UUID.randomUUID();
//https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2
@@ -106,7 +114,7 @@ public class CouchbaseService {
// increment by 1, initialize at 0 if counter doc not found
//TODO how slow is this compared with above UUID approach?
JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345
- String id = topicStr +":"+ nextIdNumber.content();
+ id = topicStr +":"+ nextIdNumber.content();
return id;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
new file mode 100644
index 00000000..f0d943d3
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
@@ -0,0 +1,67 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.util.Optional;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.repository.DbRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service for Dbs
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class DbService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private DbRepository dbRepository;
+
+ public Db getDb(String name) {
+ Optional<Db> ret = dbRepository.findById(name);
+ return ret.isPresent() ? ret.get() : null;
+ }
+
+ public Db getCouchbase() {
+ return getDb("Couchbase");
+ }
+
+ public Db getElasticsearch() {
+ return getDb("Elasticsearch");
+ }
+
+ public Db getMongoDB() {
+ return getDb("MongoDB");
+ }
+
+ public Db getDruid() {
+ return getDb("Druid");
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index cbcc5f86..1f637e1a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -31,6 +31,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
@@ -40,6 +41,7 @@ import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,14 +62,18 @@ public class ElasticsearchService {
@Autowired
private ApplicationConfiguration config;
+
+ @Autowired
+ private DbService dbService;
private RestHighLevelClient client;
ActionListener<BulkResponse> listener;
@PostConstruct
private void init() {
- String elasticsearchHost = config.getElasticsearchHost();
-
+ Db elasticsearch = dbService.getElasticsearch();
+ String elasticsearchHost = elasticsearch.getHost();
+
// Initialize the Connection
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
@@ -93,14 +99,16 @@ public class ElasticsearchService {
public void ensureTableExist(String topic) throws IOException {
String topicLower = topic.toLowerCase();
-
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
- try {
+
+ GetIndexRequest request = new GetIndexRequest();
+ request.indices(topicLower);
+
+ boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
+ if(!exists){
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
- }catch(ElasticsearchStatusException e) {
- log.info("{} create ES topic status: {}", topic, e.getDetailedMessage());
- }
+ }
}
//TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
@@ -114,7 +122,10 @@ public class ElasticsearchService {
continue;
}
}
- request.add(new IndexRequest(topic.getId().toLowerCase(), "doc").source(json.toString(), XContentType.JSON));
+
+ String id = topic.getMessageId(json); //id can be null
+
+ request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON));
}
if(config.isAsync()) {
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
@@ -122,19 +133,23 @@ public class ElasticsearchService {
try {
client.bulk(request, RequestOptions.DEFAULT);
} catch (IOException e) {
- log.error( topic.getId() , e);
+ log.error( topic.getName() , e);
}
}
}
private boolean correlateClearedMessage(JSONObject json) {
- boolean found = false;
-
+ boolean found = true;
+
/*TODO
* 1. check if this is a alarm cleared message
* 2. search previous alarm message
* 3. update previous message, if success, set found=true
*/
+ //for Sonar test, remove the following
+ if(json.isNull("kkkkk")) {
+ found = false;
+ }
return found;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
index 3dcbd8ee..4433c8c4 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
@@ -27,8 +27,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import javax.annotation.PostConstruct;
-
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,10 +56,13 @@ public class PullService {
@Autowired
private ApplicationConfiguration config;
- @PostConstruct
- private void init() {
+ /**
+ * @return the isRunning
+ */
+ public boolean isRunning() {
+ return isRunning;
}
-
+
/**
* start pulling.
*
@@ -109,6 +110,7 @@ public class PullService {
executorService.awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("executor.awaitTermination", e);
+ Thread.currentThread().interrupt();
}
isRunning = false;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index 1cd3a8a1..84e4fb7d 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -152,11 +152,11 @@ public class StoreService {
}
private void saveJsons(Topic topic, List<JSONObject> jsons) {
- if (topic.isSupportCouchbase()) {
+ if (topic.supportCouchbase()) {
couchbaseService.saveJsons(topic, jsons);
}
- if (topic.isSupportElasticsearch()) {
+ if (topic.supportElasticsearch()) {
elasticsearchService.saveJsons(topic, jsons);
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index 9b8fabc1..4e10a365 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -34,8 +34,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
- * Service for topics topic setting is stored in Couchbase, bucket 'dl', see
- * application.properties for Spring setup
+ * Service for topics
*
* @author Guobiao Mo
*
@@ -68,15 +67,14 @@ public class TopicService {
return null;
}
+ //TODO caller should not modify the returned topic, maybe return a clone
public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
Topic topic = getTopic(topicStr);
if (topic == null) {
- topic = new Topic(topicStr);
+ topic = getDefaultTopic();
}
-
- topic.setDefaultTopic(getDefaultTopic());
- if(ensureTableExist && topic.isEnabled() && topic.isSupportElasticsearch()) {
+ if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) {
elasticsearchService.ensureTableExist(topicStr);
}
return topic;