diff options
author | Vijay Venkatesh Kumar <vv770d@att.com> | 2019-04-08 16:23:35 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-04-08 16:23:35 +0000 |
commit | f4394959f7803c351543873d038af48eb91075f8 (patch) | |
tree | 7d0e7f8a3a4f372b1f005319a0e45bc132b2bcf4 /components | |
parent | 3601ddf5c10e2a0dfcbbf854fd40cfbbc14074f0 (diff) | |
parent | 8dc9d71a2465f5c1e4beb52c2375efe02bcde174 (diff) |
Merge "Use MariaDB to store application configurations"
Diffstat (limited to 'components')
32 files changed, 930 insertions, 307 deletions
diff --git a/components/datalake-handler/feeder/pom.xml b/components/datalake-handler/feeder/pom.xml index f88baf36..5b47a245 100644 --- a/components/datalake-handler/feeder/pom.xml +++ b/components/datalake-handler/feeder/pom.xml @@ -10,13 +10,19 @@ <version>1.0.0-SNAPSHOT</version> </parent> - <groupId>org.onap.datalake</groupId> + <groupId>org.onap.dcaegen2.services.components.datalake-handler</groupId> <artifactId>feeder</artifactId> <packaging>jar</packaging> <name>DataLake Feeder</name> <dependencies> + + <dependency> + <groupId>org.mariadb.jdbc</groupId> + <artifactId>mariadb-java-client</artifactId> + </dependency> + <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> @@ -42,7 +48,12 @@ <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> - + + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-data-jpa</artifactId> + </dependency> + <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-couchbase</artifactId> diff --git a/components/datalake-handler/feeder/src/assembly/docker-compose.yml b/components/datalake-handler/feeder/src/assembly/docker-compose.yml deleted file mode 100644 index 7ca466b6..00000000 --- a/components/datalake-handler/feeder/src/assembly/docker-compose.yml +++ /dev/null @@ -1,10 +0,0 @@ -version: '2' -services: - - datalake: - image: moguobiao/datalake-storage - container_name: datalake-storage - environment: - - no-needed-dmaapHost=10.0.2.15:3904 - ports: - - "1680:1680" diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql new file mode 100644 index 00000000..2185320a --- /dev/null +++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql @@ -0,0 +1,54 @@ +create database datalake;
+use datalake;
+
+CREATE TABLE `topic` (
+ `name` varchar(255) NOT NULL,
+ `correlate_cleared_message` bit(1) DEFAULT NULL,
+ `enabled` bit(1) DEFAULT NULL,
+ `login` varchar(255) DEFAULT NULL,
+ `message_id_path` varchar(255) DEFAULT NULL,
+ `pass` varchar(255) DEFAULT NULL,
+ `save_raw` bit(1) DEFAULT NULL,
+ `ttl` int(11) DEFAULT NULL,
+ `data_format` varchar(255) DEFAULT NULL,
+ `default_topic` varchar(255) DEFAULT NULL,
+ PRIMARY KEY (`name`),
+ KEY `FK_default_topic` (`default_topic`),
+ CONSTRAINT `FK_default_topic` FOREIGN KEY (`default_topic`) REFERENCES `topic` (`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+
+CREATE TABLE `db` (
+ `name` varchar(255) NOT NULL,
+ `host` varchar(255) DEFAULT NULL,
+ `login` varchar(255) DEFAULT NULL,
+ `pass` varchar(255) DEFAULT NULL,
+ `property1` varchar(255) DEFAULT NULL,
+ `property2` varchar(255) DEFAULT NULL,
+ `property3` varchar(255) DEFAULT NULL,
+ PRIMARY KEY (`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+
+CREATE TABLE `map_db_topic` (
+ `db_name` varchar(255) NOT NULL,
+ `topic_name` varchar(255) NOT NULL,
+ PRIMARY KEY (`db_name`,`topic_name`),
+ KEY `FK_topic_name` (`topic_name`),
+ CONSTRAINT `FK_topic_name` FOREIGN KEY (`topic_name`) REFERENCES `topic` (`name`),
+ CONSTRAINT `FK_db_name` FOREIGN KEY (`db_name`) REFERENCES `db` (`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+
+insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dmaap','dmaap1234','dmaap');
+insert into db (name,host) values ('Elasticsearch','dl_es');
+insert into db (name,host) values ('MongoDB','dl_mongodb');
+insert into db (name,host) values ('Druid','dl_druid');
+
+
+-- in production, default enabled should be off
+insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');
+insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);
+
+
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_');
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index 108eb4e0..1136e304 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -20,13 +20,9 @@ package org.onap.datalake.feeder.config; -import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.couchbase.CouchbaseConfiguration; import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.FilterType; -import org.springframework.context.annotation.ComponentScan; import lombok.Getter; import lombok.Setter; @@ -42,34 +38,18 @@ import lombok.Setter; @Setter @SpringBootConfiguration @ConfigurationProperties -//@ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = CouchbaseConfiguration.class)) -//https://stackoverflow.com/questions/29344313/prevent-application-commandlinerunner-classes-from-executing-during-junit-test @EnableAutoConfiguration -//@Profile("test") public class ApplicationConfiguration { - private String couchbaseHost; - private String couchbaseUser; - private String couchbasePass; - private String couchbaseBucket; - - // private int mongodbPort; - // private String mongodbDatabase; - private String dmaapZookeeperHostPort; private String dmaapKafkaHostPort; private String dmaapKafkaGroup; private long dmaapKafkaTimeout; -// private boolean dmaapMonitorAllTopics; private int dmaapCheckNewTopicIntervalInSec; - //private String dmaapHostPort; - //private Set<String> dmaapExcludeTopics; - //private Set<String> dmaapIncludeTopics; private int kafkaConsumerCount; private boolean async; - private String elasticsearchHost; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java new file mode 100644 index 00000000..c34befcc --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java @@ -0,0 +1,135 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.controller; + +import java.io.IOException; +import java.util.Set; + +import javax.servlet.http.HttpServletResponse; + +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.repository.DbRepository; +import org.onap.datalake.feeder.service.DbService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.validation.BindingResult; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; + +/** + * This controller manages the big data storage settings. All the settings are saved in database. + * + * @author Guobiao Mo + * + */ + +@RestController +@RequestMapping(value = "/dbs", produces = { MediaType.APPLICATION_JSON_VALUE }) +public class DbController { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private DbRepository dbRepository; + + @Autowired + private DbService dbService; + + //list all dbs + @GetMapping("/") + @ResponseBody + public Iterable<Db> list() throws IOException { + Iterable<Db> ret = dbRepository.findAll(); + return ret; + } + + //Read a db + //the topics are missing in the return, since in we use @JsonBackReference on Db's topics + //need to the the following method to retrieve the topic list + @GetMapping("/{name}") + @ResponseBody + public Db getDb(@PathVariable("name") String dbName) throws IOException { + Db db = dbService.getDb(dbName); + return db; + } + + //Read topics in a DB + @GetMapping("/{name}/topics") + @ResponseBody + public Set<Topic> getDbTopics(@PathVariable("name") String dbName) throws IOException { + Db db = dbService.getDb(dbName); + Set<Topic> topics = db.getTopics(); + return topics; + } + + //Update Db + @PutMapping("/") + @ResponseBody + public Db updateDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException { + + if (result.hasErrors()) { + sendError(response, 400, "Error parsing DB: "+result.toString()); + return null; + } + + Db oldDb = getDb(db.getName()); + if (oldDb == null) { + sendError(response, 404, "Db not found: "+db.getName()); + return null; + } else { + dbRepository.save(db); + return db; + } + } + + //create a new Db + @PostMapping("/") + @ResponseBody + public Db createDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException { + + if (result.hasErrors()) { + sendError(response, 400, "Error parsing DB: "+result.toString()); + return null; + } + + Db oldDb = getDb(db.getName()); + if (oldDb != null) { + sendError(response, 400, "Db already exists: "+db.getName()); + return null; + } else { + dbRepository.save(db); + return db; + } + } + + private void sendError(HttpServletResponse response, int sc, String msg) throws IOException { + log.info(msg); + response.sendError(sc, msg); + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java index 2b176370..2e13e1af 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -37,8 +38,8 @@ import org.springframework.web.bind.annotation.RestController; */ @RestController -@RequestMapping(value = "/pull", produces = { MediaType.TEXT_PLAIN_VALUE }) -public class PullController { +@RequestMapping(value = "/feeder", produces = { MediaType.TEXT_PLAIN_VALUE }) +public class FeederController { private final Logger log = LoggerFactory.getLogger(this.getClass()); @@ -49,7 +50,7 @@ public class PullController { * @return message that application is started * @throws IOException */ - @RequestMapping("/start") + @GetMapping("/start") public String start() throws IOException { log.info("DataLake feeder starting to pull data from DMaaP..."); pullService.start(); @@ -59,7 +60,7 @@ public class PullController { /** * @return message that application stop process is triggered */ - @RequestMapping("/stop") + @GetMapping("/stop") public String stop() { pullService.shutdown(); log.info("DataLake feeder is stopped."); @@ -68,9 +69,9 @@ public class PullController { /** * @return feeder status */ - @RequestMapping("/status") + @GetMapping("/status") public String status() { - String status = "to be impletemented"; + String status = "Feeder is running: "+pullService.isRunning(); log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc. return status; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java index 25028d58..c4aec14c 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java @@ -21,36 +21,44 @@ package org.onap.datalake.feeder.controller; import java.io.IOException; import java.util.List; -import java.util.Optional; +import java.util.Set; +import javax.servlet.http.HttpServletResponse; + +import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.repository.TopicRepository; +import org.onap.datalake.feeder.service.DbService; import org.onap.datalake.feeder.service.DmaapService; +import org.onap.datalake.feeder.service.TopicService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.validation.BindingResult; +import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; /** - * This controller manages all the topic settings. Topic "_DL_DEFAULT_" acts as - * the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is - * used for that topic. All the settings are saved in Couchbase. topic - * "_DL_DEFAULT_" is populated at setup by a DB script. + * This controller manages topic settings. + * + * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is used for that topic. + * All the settings are saved in database. + * topic "_DL_DEFAULT_" is populated at setup by a DB script. * * @author Guobiao Mo * */ @RestController -@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE }) +@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE}) public class TopicController { private final Logger log = LoggerFactory.getLogger(this.getClass()); @@ -60,7 +68,13 @@ public class TopicController { @Autowired private TopicRepository topicRepository; + + @Autowired + private TopicService topicService; + @Autowired + private DbService dbService; + //list all topics in DMaaP @GetMapping("/dmaap/") @ResponseBody @@ -77,33 +91,45 @@ public class TopicController { } //Read a topic - @GetMapping("/{name}") + @GetMapping("/{topicname}") @ResponseBody - public Topic getTopic(@PathVariable("name") String topicName) throws IOException { - //Topic topic = topicRepository.findFirstById(topicName); - Optional<Topic> topic = topicRepository.findById(topicName); - if (topic.isPresent()) { - return topic.get(); - } else { - return null; - } + public Topic getTopic(@PathVariable("topicname") String topicName) throws IOException { + Topic topic = topicService.getTopic(topicName); + return topic; + } + + //Read DBs in a topic + @GetMapping("/{topicname}/dbs") + @ResponseBody + public Set<Db> getTopicDbs(@PathVariable("topicname") String topicName) throws IOException { + Topic topic = topicService.getTopic(topicName); + Set<Db> dbs = topic.getDbs(); + return dbs; } //Update Topic + //This is not a partial update: old topic is wiped out, and new topic is created base on the input json. + //One exception is that old DBs are kept @PutMapping("/") @ResponseBody - public Topic updateTopic(Topic topic, BindingResult result) throws IOException { + public Topic updateTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException { if (result.hasErrors()) { - log.error(result.toString()); - - return null;//TODO return binding error + sendError(response, 400, "Error parsing Topic: "+result.toString()); + return null; } - Topic oldTopic = getTopic(topic.getId()); + Topic oldTopic = getTopic(topic.getName()); if (oldTopic == null) { - return null;//TODO return not found error + sendError(response, 404, "Topic not found "+topic.getName()); + return null; } else { + if(!topic.isDefault()) { + Topic defaultTopic = topicService.getDefaultTopic(); + topic.setDefaultTopic(defaultTopic); + } + + topic.setDbs(oldTopic.getDbs()); topicRepository.save(topic); return topic; } @@ -112,20 +138,56 @@ public class TopicController { //create a new Topic @PostMapping("/") @ResponseBody - public Topic createTopic(Topic topic, BindingResult result) throws IOException { - + public Topic createTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException { + if (result.hasErrors()) { - log.error(result.toString()); + sendError(response, 400, "Error parsing Topic: "+result.toString()); return null; } - Topic oldTopic = getTopic(topic.getId()); + Topic oldTopic = getTopic(topic.getName()); if (oldTopic != null) { - return null;//TODO return 'already exists' error + sendError(response, 400, "Topic already exists "+topic.getName()); + return null; } else { + if(!topic.isDefault()) { + Topic defaultTopic = topicService.getDefaultTopic(); + topic.setDefaultTopic(defaultTopic); + } + topicRepository.save(topic); return topic; } } + //delete a db from the topic + @DeleteMapping("/{topicname}/db/{dbname}") + @ResponseBody + public Set<Db> deleteDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException { + Topic topic = topicService.getTopic(topicName); + Set<Db> dbs = topic.getDbs(); + dbs.remove(new Db(dbName)); + + topicRepository.save(topic); + return topic.getDbs(); + } + + //add a db to the topic + @PutMapping("/{topicname}/db/{dbname}") + @ResponseBody + public Set<Db> addDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException { + Topic topic = topicService.getTopic(topicName); + Set<Db> dbs = topic.getDbs(); + + Db db = dbService.getDb(dbName); + dbs.add(db); + + topicRepository.save(topic); + return topic.getDbs(); + } + + private void sendError(HttpServletResponse response, int sc, String msg) throws IOException { + log.info(msg); + response.sendError(sc, msg); + } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java new file mode 100644 index 00000000..bbaedadc --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java @@ -0,0 +1,83 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.domain; + +import java.util.Set; + +import javax.persistence.CascadeType; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.ManyToMany; +import javax.persistence.Table; + +import com.fasterxml.jackson.annotation.JsonBackReference; + +import lombok.Getter; +import lombok.Setter; + +/** + * Domain class representing bid data storage + * + * @author Guobiao Mo + * + */ +@Setter +@Getter +@Entity +@Table(name = "db") +public class Db { + @Id + private String name; + + private String host; + private String login; + private String pass; + + private String property1; + private String property2; + private String property3; + + @JsonBackReference + @ManyToMany(mappedBy = "dbs", cascade=CascadeType.ALL) + /* + @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER) + @JoinTable( name = "map_db_topic", + joinColumns = { @JoinColumn(name="db_name") }, + inverseJoinColumns = { @JoinColumn(name="topic_name") } + ) */ + protected Set<Topic> topics; + + public Db() { + } + + public Db(String name) { + this.name = name; + } + + @Override + public boolean equals(Object obj) { + return name.equals(((Db)obj).getName()); + } + + @Override + public int hashCode() { + return name.hashCode(); + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index ace33dcc..e1da4d4d 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -19,39 +19,61 @@ */ package org.onap.datalake.feeder.domain; +import java.util.Set; import java.util.function.Predicate; -import javax.validation.constraints.NotNull; +import javax.persistence.CascadeType; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.FetchType; +import javax.persistence.Id; +import javax.persistence.JoinColumn; +import javax.persistence.JoinTable; +import javax.persistence.ManyToMany; +import javax.persistence.ManyToOne; +import javax.persistence.Table; import org.apache.commons.lang3.StringUtils; import org.json.JSONObject; import org.onap.datalake.feeder.enumeration.DataFormat; -import org.springframework.data.annotation.Id; -import org.springframework.data.annotation.Transient; -import org.springframework.data.couchbase.core.mapping.Document; +import com.fasterxml.jackson.annotation.JsonBackReference; + +import lombok.Getter; import lombok.Setter; /** - * Domain class representing topic table in Couchbase + * Domain class representing topic * * @author Guobiao Mo * */ -@Document @Setter +@Getter +@Entity +@Table(name = "topic") public class Topic { - @NotNull @Id - private String id;//topic name + private String name;//topic name - @Transient + @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) + @JoinColumn(name = "default_topic", nullable = true) private Topic defaultTopic; //for protected Kafka topics private String login; private String pass; + //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL) + @JsonBackReference + //@JsonManagedReference + @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER) + @JoinTable( name = "map_db_topic", + joinColumns = { @JoinColumn(name="topic_name") }, + inverseJoinColumns = { @JoinColumn(name="db_name") } + ) + protected Set<Db> dbs; + /** * indicate if we should monitor this topic */ @@ -60,20 +82,14 @@ public class Topic { /** * save raw message text */ + @Column(name = "save_raw") private Boolean saveRaw; /** - * true: save it to Elasticsearch false: don't save null: use default - */ - private Boolean supportElasticsearch; - private Boolean supportCouchbase; - private Boolean supportDruid; - - /** - * need to explicitly tell feeder the data format of the message + * need to explicitly tell feeder the data format of the message. * support JSON, XML, YAML, TEXT */ - private DataFormat dataFormat; + private String dataFormat; /** * TTL in day @@ -81,26 +97,24 @@ public class Topic { private Integer ttl; //if this flag is true, need to correlate alarm cleared message to previous alarm + @Column(name = "correlate_cleared_message") private Boolean correlateClearedMessage; //the value in the JSON with this path will be used as DB id + @Column(name = "message_id_path") private String messageIdPath; public Topic() { } - public Topic(String id) { - this.id = id; + public Topic(String name) { + this.name = name; } - public String getId() { - return id; + public boolean isDefault() { + return "_DL_DEFAULT_".equals(name); } - public void setDefaultTopic(Topic defaultTopic) { - this.defaultTopic = defaultTopic; - } - public boolean isEnabled() { return is(enabled, Topic::isEnabled); } @@ -121,7 +135,7 @@ public class Topic { public DataFormat getDataFormat() { if (dataFormat != null) { - return dataFormat; + return DataFormat.fromString(dataFormat); } else if (defaultTopic != null) { return defaultTopic.getDataFormat(); } else { @@ -148,24 +162,51 @@ public class Topic { return is(saveRaw, Topic::isSaveRaw); } - public boolean isSupportElasticsearch() { - return is(supportElasticsearch, Topic::isSupportElasticsearch); + public boolean supportElasticsearch() { + return containDb("Elasticsearch");//TODO string hard codes + } + + public boolean supportCouchbase() { + return containDb("Couchbase"); } - public boolean isSupportCouchbase() { - return is(supportCouchbase, Topic::isSupportCouchbase); + public boolean supportDruid() { + return containDb("Druid"); } - public boolean isSupportDruid() { - return is(supportDruid, Topic::isSupportDruid); + public boolean supportMongoDB() { + return containDb("MongoDB"); } - //extract DB id from a JSON attribute, TODO support multiple attributes + private boolean containDb(String dbName) { + Db db = new Db(dbName); + + if(dbs!=null && dbs.contains(db)) { + return true; + } + + if (defaultTopic != null) { + return defaultTopic.containDb(dbName); + } else { + return false; + } + } + + //extract DB id from JSON attributes, support multiple attributes public String getMessageId(JSONObject json) { String id = null; if(StringUtils.isNotBlank(messageIdPath)) { - id = json.query(messageIdPath).toString(); + String[] paths=messageIdPath.split(","); + + StringBuilder sb= new StringBuilder(); + for(int i=0; i<paths.length; i++) { + if(i>0) { + sb.append('^'); + } + sb.append(json.query(paths[i]).toString()); + } + id = sb.toString(); } return id; @@ -173,20 +214,17 @@ public class Topic { @Override public String toString() { - return id; + return name; } - /** - * @return the messageIdPath - */ - public String getMessageIdPath() { - return messageIdPath; + @Override + public boolean equals(Object obj) { + return name.equals(((Topic)obj).getName()); } - /** - * @param messageIdPath the messageIdPath to set - */ - public void setMessageIdPath(String messageIdPath) { - this.messageIdPath = messageIdPath; + @Override + public int hashCode() { + return name.hashCode(); } + } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java index 220a8f76..ae03f469 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java @@ -19,10 +19,18 @@ */
package org.onap.datalake.feeder.repository;
+import org.onap.datalake.feeder.domain.Db;
+
+import org.springframework.data.repository.CrudRepository;
+
/**
+ *
+ * Db Repository
+ *
* @author Guobiao Mo
*
- */
-public interface TopicRepositoryCustom {
- long updateTopic(String topic, Boolean state);
+ */
+
+public interface DbRepository extends CrudRepository<Db, String> {
+
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java index 37d1a669..2d9adef8 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java @@ -20,48 +20,17 @@ package org.onap.datalake.feeder.repository;
import org.onap.datalake.feeder.domain.Topic;
-import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed;
-import org.springframework.data.couchbase.core.query.Query;
-import org.springframework.data.couchbase.core.query.ViewIndexed;
-import org.springframework.data.couchbase.repository.CouchbasePagingAndSortingRepository;
-import org.springframework.data.domain.Page;
-import org.springframework.data.domain.Pageable;
-
-import java.util.List;
+import org.springframework.data.repository.CrudRepository;
/**
*
- * Topic Repository interface, implementation is taken care by Spring framework.
- * Customization is done through TopicRepositoryCustom and its implementation TopicRepositoryImpl.
+ * Topic Repository
*
* @author Guobiao Mo
*
- */
-@ViewIndexed(designDoc = "topic", viewName = "all")
-public interface TopicRepository extends CouchbasePagingAndSortingRepository<Topic, String>, TopicRepositoryCustom {
-/*
- Topic findFirstById(String topic);
-
- Topic findByIdAndState(String topic, boolean state);
-
- //Supports native JSON query string
- @Query("{topic:'?0'}")
- Topic findTopicById(String topic);
-
- @Query("{topic: { $regex: ?0 } })")
- List<Topic> findTopicByRegExId(String topic);
-
-
- //Page<Topic> findByCompanyIdAndNameLikeOrderByName(String companyId, String name, Pageable pageable);
-
- @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} and companyId = $1 and $2 within #{#n1ql.bucket}")
- Topic findByCompanyAndAreaId(String companyId, String areaId);
+ */
- @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} AND ANY phone IN phoneNumbers SATISFIES phone = $1 END")
- List<Topic> findByPhoneNumber(String telephoneNumber);
+public interface TopicRepository extends CrudRepository<Topic, String> {
- @Query("SELECT COUNT(*) AS count FROM #{#n1ql.bucket} WHERE #{#n1ql.filter} and companyId = $1")
- Long countBuildings(String companyId);
- */
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java deleted file mode 100644 index 018d5b95..00000000 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java +++ /dev/null @@ -1,67 +0,0 @@ -/*
-* ============LICENSE_START=======================================================
-* ONAP : DataLake
-* ================================================================================
-* Copyright 2019 China Mobile
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.datalake.feeder.repository;
-
-import org.onap.datalake.feeder.domain.Topic;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.couchbase.core.CouchbaseTemplate;
-/*
-import org.springframework.data.mongodb.MongoDbFactory;
-import org.springframework.data.mongodb.core.MongoTemplate;
-import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;
-import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;
-import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
-import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
-import org.springframework.data.mongodb.core.query.Criteria;
-import org.springframework.data.mongodb.core.query.Query;
-import org.springframework.data.mongodb.core.query.Update;
-
-import com.mongodb.WriteResult;
-import com.mongodb.client.result.UpdateResult;
-*/
-import java.util.List;
-
-/**
- * @author Guobiao Mo
- *
- */
-public class TopicRepositoryImpl implements TopicRepositoryCustom {
-
- @Autowired
- CouchbaseTemplate template;
-
- @Override
- public long updateTopic(String topic, Boolean state) {
-/*
- Query query = new Query(Criteria.where("id").is(topic));
- Update update = new Update();
- update.set("state", state);
-
- UpdateResult result = mongoTemplate.updateFirst(query, update, Topic.class);
-
- if(result!=null)
- return result.getModifiedCount();
- else
- */ return 0L;
-
-
-
- }
-}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java index 35432587..f74829e1 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java @@ -27,7 +27,7 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.json.JSONObject; -import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.domain.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,18 +57,20 @@ public class CouchbaseService { private final Logger log = LoggerFactory.getLogger(this.getClass()); @Autowired - private ApplicationConfiguration config; - + private DbService dbService; + Bucket bucket; @PostConstruct private void init() { // Initialize Couchbase Connection - Cluster cluster = CouchbaseCluster.create(config.getCouchbaseHost()); - cluster.authenticate(config.getCouchbaseUser(), config.getCouchbasePass()); - bucket = cluster.openBucket(config.getCouchbaseBucket()); + + Db couchbase = dbService.getCouchbase(); + Cluster cluster = CouchbaseCluster.create(couchbase.getHost()); + cluster.authenticate(couchbase.getLogin(), couchbase.getPass()); + bucket = cluster.openBucket(couchbase.getProperty1()); - log.info("Connect to Couchbase " + config.getCouchbaseHost()); + log.info("Connect to Couchbase " + couchbase.getHost()); // Create a N1QL Primary Index (but ignore if it exists) bucket.bucketManager().createN1qlPrimaryIndex(true, false); @@ -90,15 +92,21 @@ public class CouchbaseService { //setup TTL int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second - String id = getId(topic.getId()); + String id = getId(topic, json); JsonDocument doc = JsonDocument.create(id, expiry, jsonObject); documents.add(doc); } saveDocuments(documents); } - - private String getId(String topicStr) { + public String getId(Topic topic, JSONObject json) { + //if this topic requires extract id from JSON + String id = topic.getMessageId(json); + if(id != null) { + return id; + } + + String topicStr= topic.getName(); //String id = topicStr+":"+timestamp+":"+UUID.randomUUID(); //https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2 @@ -106,7 +114,7 @@ public class CouchbaseService { // increment by 1, initialize at 0 if counter doc not found //TODO how slow is this compared with above UUID approach? JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 - String id = topicStr +":"+ nextIdNumber.content(); + id = topicStr +":"+ nextIdNumber.content(); return id; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java new file mode 100644 index 00000000..f0d943d3 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java @@ -0,0 +1,67 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.util.Optional; + +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.repository.DbRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Service for Dbs + * + * @author Guobiao Mo + * + */ +@Service +public class DbService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private DbRepository dbRepository; + + public Db getDb(String name) { + Optional<Db> ret = dbRepository.findById(name); + return ret.isPresent() ? ret.get() : null; + } + + public Db getCouchbase() { + return getDb("Couchbase"); + } + + public Db getElasticsearch() { + return getDb("Elasticsearch"); + } + + public Db getMongoDB() { + return getDb("MongoDB"); + } + + public Db getDruid() { + return getDb("Druid"); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java index cbcc5f86..1f637e1a 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -31,6 +31,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; @@ -40,6 +41,7 @@ import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.domain.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,14 +62,18 @@ public class ElasticsearchService { @Autowired private ApplicationConfiguration config; + + @Autowired + private DbService dbService; private RestHighLevelClient client; ActionListener<BulkResponse> listener; @PostConstruct private void init() { - String elasticsearchHost = config.getElasticsearchHost(); - + Db elasticsearch = dbService.getElasticsearch(); + String elasticsearchHost = elasticsearch.getHost(); + // Initialize the Connection client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http"))); @@ -93,14 +99,16 @@ public class ElasticsearchService { public void ensureTableExist(String topic) throws IOException { String topicLower = topic.toLowerCase(); - - CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); - try { + + GetIndexRequest request = new GetIndexRequest(); + request.indices(topicLower); + + boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); + if(!exists){ + CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged()); - }catch(ElasticsearchStatusException e) { - log.info("{} create ES topic status: {}", topic, e.getDetailedMessage()); - } + } } //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME @@ -114,7 +122,10 @@ public class ElasticsearchService { continue; } } - request.add(new IndexRequest(topic.getId().toLowerCase(), "doc").source(json.toString(), XContentType.JSON)); + + String id = topic.getMessageId(json); //id can be null + + request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON)); } if(config.isAsync()) { client.bulkAsync(request, RequestOptions.DEFAULT, listener); @@ -122,19 +133,23 @@ public class ElasticsearchService { try { client.bulk(request, RequestOptions.DEFAULT); } catch (IOException e) { - log.error( topic.getId() , e); + log.error( topic.getName() , e); } } } private boolean correlateClearedMessage(JSONObject json) { - boolean found = false; - + boolean found = true; + /*TODO * 1. check if this is a alarm cleared message * 2. search previous alarm message * 3. update previous message, if success, set found=true */ + //for Sonar test, remove the following + if(json.isNull("kkkkk")) { + found = false; + } return found; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java index 3dcbd8ee..4433c8c4 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java @@ -27,8 +27,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import javax.annotation.PostConstruct; - import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,10 +56,13 @@ public class PullService { @Autowired private ApplicationConfiguration config; - @PostConstruct - private void init() { + /** + * @return the isRunning + */ + public boolean isRunning() { + return isRunning; } - + /** * start pulling. * @@ -109,6 +110,7 @@ public class PullService { executorService.awaitTermination(10L, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.error("executor.awaitTermination", e); + Thread.currentThread().interrupt(); } isRunning = false; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index 1cd3a8a1..84e4fb7d 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -152,11 +152,11 @@ public class StoreService { } private void saveJsons(Topic topic, List<JSONObject> jsons) { - if (topic.isSupportCouchbase()) { + if (topic.supportCouchbase()) { couchbaseService.saveJsons(topic, jsons); } - if (topic.isSupportElasticsearch()) { + if (topic.supportElasticsearch()) { elasticsearchService.saveJsons(topic, jsons); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index 9b8fabc1..4e10a365 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -34,8 +34,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** - * Service for topics topic setting is stored in Couchbase, bucket 'dl', see - * application.properties for Spring setup + * Service for topics * * @author Guobiao Mo * @@ -68,15 +67,14 @@ public class TopicService { return null; } + //TODO caller should not modify the returned topic, maybe return a clone public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException { Topic topic = getTopic(topicStr); if (topic == null) { - topic = new Topic(topicStr); + topic = getDefaultTopic(); } - - topic.setDefaultTopic(getDefaultTopic()); - if(ensureTableExist && topic.isEnabled() && topic.isSupportElasticsearch()) { + if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) { elasticsearchService.ensureTableExist(topicStr); } return topic; diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties index a0ab90fe..ea94d004 100644 --- a/components/datalake-handler/feeder/src/main/resources/application.properties +++ b/components/datalake-handler/feeder/src/main/resources/application.properties @@ -2,6 +2,16 @@ server.port = 1680 +# Spring connection to MariaDB for ORM +#spring.jpa.hibernate.ddl-auto=update +spring.jpa.hibernate.ddl-auto=none +spring.jpa.show-sql=false + +#spring.datasource.driver-class-name=com.mysql.jdbc.Driver +spring.datasource.url=jdbc:mariadb://dl_mariadb:3306/datalake?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8 +spring.datasource.username=nook +spring.datasource.password=nook123 + #For Beijing lab #dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80 @@ -30,24 +40,3 @@ logging.level.org.springframework.web=ERROR logging.level.com.att.nsa.apiClient.http=ERROR logging.level.org.onap.datalake=DEBUG -# Spring connection to Couchbase DB for ORM - -#not work when run in osboxes, but works in Eclipse -spring.couchbase.bootstrap-hosts=dl_couchbase -#spring.couchbase.bootstrap-hosts=172.30.1.74 - -#a user with name as bucket.name must be created, with the pass as bucket.password -# https://stackoverflow.com/questions/51496589/bucket-password-in-couchbase -spring.couchbase.bucket.name=dl -spring.couchbase.bucket.password=dl1234 -spring.data.couchbase.auto-index=true - -#DL Feeder DB: Couchbase -couchbaseHost=dl_couchbase -#couchbaseHost=172.30.1.74 -couchbaseUser=dmaap -couchbasePass=dmaap1234 -couchbaseBucket=dmaap - -#DL Feeder DB: Elasticsearch -elasticsearchHost=dl_es diff --git a/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json index e536c7b3..a20e5eb3 100644 --- a/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json +++ b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json @@ -227,7 +227,7 @@ "taskDuration": "PT1H", "completionTimeout": "PT30M", "consumerProperties": { - "bootstrap.servers": "dl_dmaap:9092" + "bootstrap.servers": "message-router-kafka:9092" }, "useEarliestOffset": true } diff --git a/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json new file mode 100644 index 00000000..cb3c98da --- /dev/null +++ b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json @@ -0,0 +1,52 @@ +{
+ "_id": "5bb3dfae5bea3f1cb49d4f3f",
+ "cambria.partition": "AAI",
+ "event-header": {
+ "severity": "NORMAL",
+ "entity-type": "esr-thirdparty-sdnc",
+ "top-entity-type": "esr-thirdparty-sdnc",
+ "entity-link": "/aai/v11/external-system/esr-thirdparty-sdnc-list/esr-thirdparty-sdnc/SDWANController1",
+ "event-type": "AAI-EVENT",
+ "domain": "dev",
+ "action": "CREATE",
+ "sequence-number": "0",
+ "id": "69f2935f-c3c1-4a63-b3f1-519c3898328b",
+ "source-name": "ying",
+ "version": "v11",
+ "timestamp": "20180919-06:20:44:216"
+ },
+ "entity": {
+ "thirdparty-sdnc-id": "SDWANController1",
+ "resource-version": "1537338043473",
+ "location": "Core",
+ "product-name": "SD-WAN",
+ "esr-system-info-list": {
+ "esr-system-info": [
+ {
+ "esr-system-info-id": "SDWANController1-ESR-1",
+ "system-type": "example-system-type-val-12078",
+ "service-url": "https://172.19.48.77:18008",
+ "ssl-cacert": "example-ssl-cacert-val-20589",
+ "type": "WAN",
+ "ssl-insecure": true,
+ "system-status": "example-system-status-val-23435",
+ "version": "V3R1",
+ "passive": true,
+ "password": "Onap@12345",
+ "protocol": "RESTCONF",
+ "ip-address": "172.19.48.77",
+ "cloud-domain": "example-cloud-domain-val-76077",
+ "user-name": "northapi1@huawei.com",
+ "system-name": "SDWANController",
+ "port": "18008",
+ "vendor": "IP-WAN",
+ "resource-version": "1537338044166",
+ "remote-path": "example-remotepath-val-5833",
+ "default-tenant": "example-default-tenant-val-71148"
+ }
+ ]
+ }
+ },
+ "_dl_type_": "JSON",
+ "_dl_text_": "{\"cambria.partition\":\"AAI\",\"event-header\":{\"severity\":\"NORMAL\",\"entity-type\":\"esr-thirdparty-sdnc\",\"top-entity-type\":\"esr-thirdparty-sdnc\",\"entity-link\":\"/aai/v11/external-system/esr-thirdparty-sdnc-list/esr-thirdparty-sdnc/SDWANController1\",\"event-type\":\"AAI-EVENT\",\"domain\":\"dev\",\"action\":\"CREATE\",\"sequence-number\":\"0\",\"id\":\"69f2935f-c3c1-4a63-b3f1-519c3898328b\",\"source-name\":\"ying\",\"version\":\"v11\",\"timestamp\":\"20180919-06:20:44:216\"},\"entity\":{\"thirdparty-sdnc-id\":\"SDWANController1\",\"resource-version\":\"1537338043473\",\"location\":\"Core\",\"product-name\":\"SD-WAN\",\"esr-system-info-list\":{\"esr-system-info\":[{\"esr-system-info-id\":\"SDWANController1-ESR-1\",\"system-type\":\"example-system-type-val-12078\",\"service-url\":\"https://172.19.48.77:18008\",\"ssl-cacert\":\"example-ssl-cacert-val-20589\",\"type\":\"WAN\",\"ssl-insecure\":true,\"system-status\":\"example-system-status-val-23435\",\"version\":\"V3R1\",\"passive\":true,\"password\":\"Onap@12345\",\"protocol\":\"RESTCONF\",\"ip-address\":\"172.19.48.77\",\"cloud-domain\":\"example-cloud-domain-val-76077\",\"user-name\":\"northapi1@huawei.com\",\"system-name\":\"SDWANController\",\"port\":\"18008\",\"vendor\":\"IP-WAN\",\"resource-version\":\"1537338044166\",\"remote-path\":\"example-remotepath-val-5833\",\"default-tenant\":\"example-default-tenant-val-71148\"}]}}}"
+}
\ No newline at end of file diff --git a/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json index 16eb1636..19bf6ed5 100644 --- a/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json +++ b/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json @@ -142,7 +142,7 @@ "taskDuration": "PT1H", "completionTimeout": "PT30M", "consumerProperties": { - "bootstrap.servers": "dl_dmaap:9092" + "bootstrap.servers": "message-router-kafka:9092" }, "useEarliestOffset": true } diff --git a/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json index 3c871a87..6797b19e 100644 --- a/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json +++ b/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json @@ -200,7 +200,7 @@ "taskDuration": "PT1H", "completionTimeout": "PT30M", "consumerProperties": { - "bootstrap.servers": "dl_dmaap:9092" + "bootstrap.servers": "message-router-kafka:9092" }, "useEarliestOffset": true } diff --git a/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json index c3f60372..f910acee 100644 --- a/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json +++ b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json @@ -302,7 +302,7 @@ "taskDuration": "PT1H", "completionTimeout": "PT30M", "consumerProperties": { - "bootstrap.servers": "dl_dmaap:9092" + "bootstrap.servers": "message-router-kafka:9092" }, "useEarliestOffset": true } diff --git a/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json new file mode 100644 index 00000000..957060f4 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json @@ -0,0 +1,75 @@ +{
+ "_id": {
+ "$oid": "5bb3d3aa5bea3f41300957c6"
+ },
+ "sendEpsShort": {
+ "summary": "0.0 send eps (10 mins)",
+ "raw": 0
+ },
+ "recvEpsInstant": {
+ "summary": "0.03333333333333333 recv eps (1 min)",
+ "raw": 0.03333333333333333
+ },
+ "fanOut": {
+ "summary": "0.7051873815491838 sends per recv",
+ "raw": 0.7051873815491838
+ },
+ "sendEpsLong": {
+ "summary": "0.0 send eps (1 hr)",
+ "raw": 0
+ },
+ "kafkaConsumerTimeouts": {
+ "summary": "164 Kafka Consumers Timedout",
+ "raw": 164
+ },
+ "recvEpsLong": {
+ "summary": "0.03333333333333333 recv eps (1 hr)",
+ "raw": 0.03333333333333333
+ },
+ "sendEpsInstant": {
+ "summary": "0.0 send eps (1 min)",
+ "raw": 0
+ },
+ "recvEpsShort": {
+ "summary": "0.03333333333333333 recv eps (10 mins)",
+ "raw": 0.03333333333333333
+ },
+ "kafkaConsumerClaims": {
+ "summary": "1 Kafka Consumers Claimed",
+ "raw": 1
+ },
+ "version": {
+ "summary": "Version 1.1.3",
+ "raw": 0
+ },
+ "upTime": {
+ "summary": "604800 seconds since start",
+ "raw": 604800
+ },
+ "sendTotalEvents": {
+ "summary": "46139 Total events sent since start",
+ "raw": 46139
+ },
+ "hostname": "3d5704fccbc5",
+ "kafkaConsumerCacheMiss": {
+ "summary": "179 Kafka Consumer Cache Misses",
+ "raw": 179
+ },
+ "metricsSendTime": "1537639709 Metrics Send Time (epoch); 2018-09-22T18:08:29UTC",
+ "kafkaConsumerCacheHit": {
+ "summary": "317143 Kafka Consumer Cache Hits",
+ "raw": 317143
+ },
+ "now": 1537639709380,
+ "transactionEnabled": false,
+ "startTime": {
+ "summary": "1537034908 Start Time (epoch); 2018-09-15T18:08:28UTC",
+ "raw": 1537034908
+ },
+ "recvTotalEvents": {
+ "summary": "65428 Total events received since start",
+ "raw": 65428
+ },
+ "_dl_type_": "JSON",
+ "_dl_text_": "{\"sendEpsShort\":{\"summary\":\"0.0 send eps (10 mins)\",\"raw\":0},\"recvEpsInstant\":{\"summary\":\"0.03333333333333333 recv eps (1 min)\",\"raw\":0.03333333333333333},\"fanOut\":{\"summary\":\"0.7051873815491838 sends per recv\",\"raw\":0.7051873815491838},\"sendEpsLong\":{\"summary\":\"0.0 send eps (1 hr)\",\"raw\":0},\"kafkaConsumerTimeouts\":{\"summary\":\"164 Kafka Consumers Timedout\",\"raw\":164},\"recvEpsLong\":{\"summary\":\"0.03333333333333333 recv eps (1 hr)\",\"raw\":0.03333333333333333},\"sendEpsInstant\":{\"summary\":\"0.0 send eps (1 min)\",\"raw\":0},\"recvEpsShort\":{\"summary\":\"0.03333333333333333 recv eps (10 mins)\",\"raw\":0.03333333333333333},\"kafkaConsumerClaims\":{\"summary\":\"1 Kafka Consumers Claimed\",\"raw\":1},\"version\":{\"summary\":\"Version 1.1.3\",\"raw\":0},\"upTime\":{\"summary\":\"604800 seconds since start\",\"raw\":604800},\"sendTotalEvents\":{\"summary\":\"46139 Total events sent since start\",\"raw\":46139},\"hostname\":\"3d5704fccbc5\",\"kafkaConsumerCacheMiss\":{\"summary\":\"179 Kafka Consumer Cache Misses\",\"raw\":179},\"metricsSendTime\":\"1537639709 Metrics Send Time (epoch); 2018-09-22T18:08:29UTC\",\"kafkaConsumerCacheHit\":{\"summary\":\"317143 Kafka Consumer Cache Hits\",\"raw\":317143},\"now\":1537639709380,\"transactionEnabled\":false,\"startTime\":{\"summary\":\"1537034908 Start Time (epoch); 2018-09-15T18:08:28UTC\",\"raw\":1537034908},\"recvTotalEvents\":{\"summary\":\"65428 Total events received since start\",\"raw\":65428}}"
+}
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json new file mode 100644 index 00000000..8de08f76 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json @@ -0,0 +1,25 @@ +{
+ "_id": "5bb3d3ad5bea3f41300959ba",
+ "closedLoopEventClient": "DCAE.HolmesInstance",
+ "policyVersion": "1.0.0.5",
+ "policyName": "CCVPN",
+ "policyScope": "service=SOTNService,type=SampleType,closedLoopControlName=CL-CCVPN-d925ed73-8231-4d02-9545-db4e101f88f8",
+ "target_type": "VM",
+ "AAI": {
+ "serviceType": "TestService",
+ "service-instance_service-instance-id": "200",
+ "globalSubscriberId": "Customer1",
+ "vserver_vserver-name": "TBD",
+ "network-information_network-id": "providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F200"
+ },
+ "closedLoopAlarmStart": "1532769303924000",
+ "closedLoopEventStatus": "ONSET",
+ "version": "1.0.2",
+ "closedLoopControlName": "ControlLoop-CCVPN-2179b738-fd36-4843-a71a-a8c24c70c66b",
+ "target": "vserver.vserver-name",
+ "closedLoopAlarmEnd": "1532769303924000",
+ "requestID": "6f455b14-efd9-450a-bf78-e47d55b6da87",
+ "from": "DCAE",
+ "_dl_type_": "JSON",
+ "_dl_text_": "{\"closedLoopEventClient\":\"DCAE.HolmesInstance\",\"policyVersion\":\"1.0.0.5\",\"policyName\":\"CCVPN\",\"policyScope\":\"service=SOTNService,type=SampleType,closedLoopControlName=CL-CCVPN-d925ed73-8231-4d02-9545-db4e101f88f8\",\"target_type\":\"VM\",\"AAI\":{\"serviceType\":\"TestService\",\"service-instance.service-instance-id\":\"200\",\"globalSubscriberId\":\"Customer1\",\"vserver.vserver-name\":\"TBD\",\"network-information.network-id\":\"providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F200\"},\"closedLoopAlarmStart\":1532769303924000,\"closedLoopEventStatus\":\"ONSET\",\"version\":\"1.0.2\",\"closedLoopControlName\":\"ControlLoop-CCVPN-2179b738-fd36-4843-a71a-a8c24c70c66b\",\"target\":\"vserver.vserver-name\",\"closedLoopAlarmEnd\":1532769303924000,\"requestID\":\"6f455b14-efd9-450a-bf78-e47d55b6da87\",\"from\":\"DCAE\"}"
+}
\ No newline at end of file diff --git a/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json new file mode 100644 index 00000000..bb506d54 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json @@ -0,0 +1,57 @@ +{
+ "_id": "5bb3d3b45bea3f41300960f8",
+ "event": {
+ "commonEventHeader": {
+ "sourceId": "/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27",
+ "startEpochMicrosec": "1537438335829000",
+ "eventId": "2ef8b41b-b081-477b-9d0b-1aaaa3b69857",
+ "domain": "fault",
+ "lastEpochMicrosec": 1537438335829000,
+ "eventName": "Fault_Route_Status",
+ "sourceName": "/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27",
+ "priority": "High",
+ "version": 3,
+ "reportingEntityName": "Domain_Contorller"
+ },
+ "faultFields": {
+ "eventSeverity": "CRITICAL",
+ "alarmCondition": "Route_Status",
+ "faultFieldsVersion": 2,
+ "specificProblem": "Fault_SOTN_Service_Status",
+ "alarmAdditionalInformation": [
+ {
+ "name": "networkId",
+ "value": "providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100"
+ },
+ {
+ "name": "node",
+ "value": "11.11.11.11"
+ },
+ {
+ "name": "tp-id",
+ "value": "27"
+ },
+ {
+ "name": "oper-status",
+ "value": "down"
+ },
+ {
+ "name": "network-ref",
+ "value": "providerId/5555/clientId/6666/topologyId/33"
+ },
+ {
+ "name": "node-ref",
+ "value": "0.51.0.103"
+ },
+ {
+ "name": "tp-ref",
+ "value": "4"
+ }
+ ],
+ "eventSourceType": "other",
+ "vfStatus": "Active"
+ }
+ },
+ "_dl_type_": "JSON",
+ "_dl_text_": "{\"event\":{\"commonEventHeader\":{\"sourceId\":\"/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27\",\"startEpochMicrosec\":1537438335829000,\"eventId\":\"2ef8b41b-b081-477b-9d0b-1aaaa3b69857\",\"domain\":\"fault\",\"lastEpochMicrosec\":1537438335829000,\"eventName\":\"Fault_Route_Status\",\"sourceName\":\"/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27\",\"priority\":\"High\",\"version\":3,\"reportingEntityName\":\"Domain_Contorller\"},\"faultFields\":{\"eventSeverity\":\"CRITICAL\",\"alarmCondition\":\"Route_Status\",\"faultFieldsVersion\":2,\"specificProblem\":\"Fault_SOTN_Service_Status\",\"alarmAdditionalInformation\":[{\"name\":\"networkId\",\"value\":\"providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100\"},{\"name\":\"node\",\"value\":\"11.11.11.11\"},{\"name\":\"tp-id\",\"value\":\"27\"},{\"name\":\"oper-status\",\"value\":\"down\"},{\"name\":\"network-ref\",\"value\":\"providerId/5555/clientId/6666/topologyId/33\"},{\"name\":\"node-ref\",\"value\":\"0.51.0.103\"},{\"name\":\"tp-ref\",\"value\":\"4\"}],\"eventSourceType\":\"other\",\"vfStatus\":\"Active\"}}}"
+}
\ No newline at end of file diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java index 934451fe..02db5a25 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java @@ -56,10 +56,6 @@ public class ApplicationConfigurationTest { @Test public void readConfig() { - assertNotNull(config.getCouchbaseHost()); - assertNotNull(config.getCouchbaseUser()); - assertNotNull(config.getCouchbasePass()); - assertNotNull(config.getCouchbaseBucket()); assertNotNull(config.getDmaapZookeeperHostPort()); assertNotNull(config.getDmaapKafkaHostPort()); diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java new file mode 100644 index 00000000..ea1d6894 --- /dev/null +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java @@ -0,0 +1,44 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.domain; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import org.junit.Test; + +/** + * Test Db + * + * @author Guobiao Mo + * + */ + +public class DbTest { + + @Test + public void testIs() { + Db couchbase=new Db("Couchbase"); + Db mongoDB=new Db("MongoDB"); + Db mongoDB2=new Db("MongoDB"); + assertNotEquals(couchbase.hashCode(), mongoDB.hashCode()); + assertNotEquals(couchbase, mongoDB); + assertEquals(mongoDB, mongoDB2); + } +} diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java index 23ec3b10..1e402522 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java @@ -23,8 +23,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.HashSet; + import org.json.JSONObject; -import org.junit.Test; +import org.junit.Test; +import org.onap.datalake.feeder.enumeration.DataFormat; /** * Test Topic @@ -50,17 +53,49 @@ public class TopicTest { } @Test + public void getMessageIdFromMultipleAttributes() { + String text = "{ data: { data2 : { value : 'hello'}, data3 : 'world'}}"; + + JSONObject json = new JSONObject(text); + + Topic topic = new Topic("test getMessageId"); + topic.setMessageIdPath("/data/data2/value,/data/data3"); + + String value = topic.getMessageId(json); + + assertEquals(value, "hello^world"); + } + + @Test public void testIs() { - Topic defaultTopic=new Topic("default"); + Topic defaultTopic=new Topic("_DL_DEFAULT_"); Topic testTopic = new Topic("test"); testTopic.setDefaultTopic(defaultTopic); + + assertTrue(defaultTopic.isDefault()); + assertFalse(testTopic.isDefault()); + + assertTrue(testTopic.equals(new Topic("test"))); + assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode()); + + defaultTopic.setDbs(new HashSet<>()); + defaultTopic.getDbs().add(new Db("Elasticsearch")); + assertTrue(testTopic.supportElasticsearch()); + assertFalse(testTopic.supportCouchbase()); + assertFalse(testTopic.supportDruid()); + assertFalse(testTopic.supportMongoDB()); + + defaultTopic.getDbs().remove(new Db("Elasticsearch")); + assertFalse(testTopic.supportElasticsearch()); - defaultTopic.setSupportElasticsearch(true); - boolean b = testTopic.isSupportElasticsearch(); - assertTrue(b); + defaultTopic.setCorrelateClearedMessage(true); + defaultTopic.setDataFormat("XML"); + defaultTopic.setEnabled(true); + defaultTopic.setSaveRaw(true); + assertTrue(testTopic.isCorrelateClearedMessage()); + assertTrue(testTopic.isEnabled()); + assertTrue(testTopic.isSaveRaw()); - defaultTopic.setSupportElasticsearch(false); - b = testTopic.isSupportElasticsearch(); - assertFalse(b); + assertEquals(defaultTopic.getDataFormat(), DataFormat.XML); } } diff --git a/components/datalake-handler/feeder/src/test/resources/application.properties b/components/datalake-handler/feeder/src/test/resources/application.properties index ede5999b..d6d98e64 100644 --- a/components/datalake-handler/feeder/src/test/resources/application.properties +++ b/components/datalake-handler/feeder/src/test/resources/application.properties @@ -2,14 +2,6 @@ server.port = 1680 - -#For Beijing lab -#dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80 -#dmaapKafkaHostPort=kafka.mr01.onap.vip:80 -#spring.couchbase.bootstrap-hosts=172.30.1.74 -#couchbaseHost=172.30.1.74 - - #DMaaP #dmaapZookeeperHostPort=127.0.0.1:2181 #dmaapKafkaHostPort=127.0.0.1:9092 @@ -30,13 +22,4 @@ logging.level.org.springframework.web=ERROR logging.level.com.att.nsa.apiClient.http=ERROR logging.level.org.onap.datalake=DEBUG - -#DL Feeder DB: Couchbase -couchbaseHost=dl_couchbase -#couchbaseHost=172.30.1.74 -couchbaseUser=dmaap -couchbasePass=dmaap1234 -couchbaseBucket=dmaap - -#DL Feeder DB: Elasticsearch -elasticsearchHost=dl_es + diff --git a/components/datalake-handler/pom.xml b/components/datalake-handler/pom.xml index ede2a27b..a526cb54 100644 --- a/components/datalake-handler/pom.xml +++ b/components/datalake-handler/pom.xml @@ -31,7 +31,7 @@ <springcouchbase.version>3.1.2.RELEASE</springcouchbase.version> <jackson.version>2.9.6</jackson.version> <kafka.version>2.0.0</kafka.version> - <elasticsearchjava.version>6.6.0</elasticsearchjava.version> + <elasticsearchjava.version>6.7.0</elasticsearchjava.version> </properties> @@ -39,6 +39,12 @@ <dependencies> <dependency> + <groupId>org.mariadb.jdbc</groupId> + <artifactId>mariadb-java-client</artifactId> + <version>2.4.1</version> + </dependency> + + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> @@ -142,6 +148,13 @@ <version>${springboot.version}</version> </dependency> <!-- end::actuator[] --> + + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-data-jpa</artifactId> + <version>${springboot.version}</version> + </dependency> + <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-couchbase</artifactId> |