From 8dc9d71a2465f5c1e4beb52c2375efe02bcde174 Mon Sep 17 00:00:00 2001 From: Guobiao Mo Date: Sat, 6 Apr 2019 21:52:09 -0700 Subject: Use MariaDB to store application configurations Issue-ID: DCAEGEN2-1400 Change-Id: I86b5bc25d84b98f7ac84b95f1690089dcebe7f0a Signed-off-by: Guobiao Mo --- components/datalake-handler/feeder/pom.xml | 15 ++- .../feeder/src/assembly/docker-compose.yml | 10 -- .../feeder/src/assembly/scripts/init_db.sql | 54 +++++++++ .../feeder/config/ApplicationConfiguration.java | 20 --- .../datalake/feeder/controller/DbController.java | 135 +++++++++++++++++++++ .../feeder/controller/FeederController.java | 78 ++++++++++++ .../datalake/feeder/controller/PullController.java | 77 ------------ .../feeder/controller/TopicController.java | 114 +++++++++++++---- .../java/org/onap/datalake/feeder/domain/Db.java | 83 +++++++++++++ .../org/onap/datalake/feeder/domain/Topic.java | 130 +++++++++++++------- .../datalake/feeder/repository/DbRepository.java | 36 ++++++ .../feeder/repository/TopicRepository.java | 39 +----- .../feeder/repository/TopicRepositoryCustom.java | 28 ----- .../feeder/repository/TopicRepositoryImpl.java | 67 ---------- .../datalake/feeder/service/CouchbaseService.java | 30 +++-- .../onap/datalake/feeder/service/DbService.java | 67 ++++++++++ .../feeder/service/ElasticsearchService.java | 39 ++++-- .../onap/datalake/feeder/service/PullService.java | 12 +- .../onap/datalake/feeder/service/StoreService.java | 4 +- .../onap/datalake/feeder/service/TopicService.java | 10 +- .../src/main/resources/application.properties | 31 ++--- .../druid/AAI-EVENT-kafka-supervisor.json | 2 +- .../resources/druid/AAI-EVENT-sample-format.json | 52 ++++++++ .../druid/DCAE_CL_OUTPUT-kafka-supervisor.json | 2 +- .../druid/SEC_FAULT_OUTPUT-kafka-supervisor.json | 2 +- ...rtr.apinode.metrics.dmaap-kafka-supervisor.json | 2 +- ...msgrtr.apinode.metrics.dmaap-sample-format.json | 75 ++++++++++++ ...authenticated.DCAE_CL_OUTPUT-sample-format.json | 25 ++++ ...thenticated.SEC_FAULT_OUTPUT-sample-format.json | 57 +++++++++ .../config/ApplicationConfigurationTest.java | 4 - .../org/onap/datalake/feeder/domain/DbTest.java | 44 +++++++ .../org/onap/datalake/feeder/domain/TopicTest.java | 51 ++++++-- .../src/test/resources/application.properties | 19 +-- components/datalake-handler/pom.xml | 15 ++- 34 files changed, 1026 insertions(+), 403 deletions(-) delete mode 100644 components/datalake-handler/feeder/src/assembly/docker-compose.yml create mode 100644 components/datalake-handler/feeder/src/assembly/scripts/init_db.sql create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java delete mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java delete mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java delete mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java create mode 100644 components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json create mode 100644 components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json create mode 100644 components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json create mode 100644 components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json create mode 100644 components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java (limited to 'components/datalake-handler') diff --git a/components/datalake-handler/feeder/pom.xml b/components/datalake-handler/feeder/pom.xml index f88baf36..5b47a245 100644 --- a/components/datalake-handler/feeder/pom.xml +++ b/components/datalake-handler/feeder/pom.xml @@ -10,13 +10,19 @@ 1.0.0-SNAPSHOT - org.onap.datalake + org.onap.dcaegen2.services.components.datalake-handler feeder jar DataLake Feeder + + + org.mariadb.jdbc + mariadb-java-client + + org.json json @@ -42,7 +48,12 @@ org.springframework.boot spring-boot-starter-actuator - + + + org.springframework.boot + spring-boot-starter-data-jpa + + org.springframework.boot spring-boot-starter-data-couchbase diff --git a/components/datalake-handler/feeder/src/assembly/docker-compose.yml b/components/datalake-handler/feeder/src/assembly/docker-compose.yml deleted file mode 100644 index 7ca466b6..00000000 --- a/components/datalake-handler/feeder/src/assembly/docker-compose.yml +++ /dev/null @@ -1,10 +0,0 @@ -version: '2' -services: - - datalake: - image: moguobiao/datalake-storage - container_name: datalake-storage - environment: - - no-needed-dmaapHost=10.0.2.15:3904 - ports: - - "1680:1680" diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql new file mode 100644 index 00000000..2185320a --- /dev/null +++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql @@ -0,0 +1,54 @@ +create database datalake; +use datalake; + +CREATE TABLE `topic` ( + `name` varchar(255) NOT NULL, + `correlate_cleared_message` bit(1) DEFAULT NULL, + `enabled` bit(1) DEFAULT NULL, + `login` varchar(255) DEFAULT NULL, + `message_id_path` varchar(255) DEFAULT NULL, + `pass` varchar(255) DEFAULT NULL, + `save_raw` bit(1) DEFAULT NULL, + `ttl` int(11) DEFAULT NULL, + `data_format` varchar(255) DEFAULT NULL, + `default_topic` varchar(255) DEFAULT NULL, + PRIMARY KEY (`name`), + KEY `FK_default_topic` (`default_topic`), + CONSTRAINT `FK_default_topic` FOREIGN KEY (`default_topic`) REFERENCES `topic` (`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + + +CREATE TABLE `db` ( + `name` varchar(255) NOT NULL, + `host` varchar(255) DEFAULT NULL, + `login` varchar(255) DEFAULT NULL, + `pass` varchar(255) DEFAULT NULL, + `property1` varchar(255) DEFAULT NULL, + `property2` varchar(255) DEFAULT NULL, + `property3` varchar(255) DEFAULT NULL, + PRIMARY KEY (`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + + +CREATE TABLE `map_db_topic` ( + `db_name` varchar(255) NOT NULL, + `topic_name` varchar(255) NOT NULL, + PRIMARY KEY (`db_name`,`topic_name`), + KEY `FK_topic_name` (`topic_name`), + CONSTRAINT `FK_topic_name` FOREIGN KEY (`topic_name`) REFERENCES `topic` (`name`), + CONSTRAINT `FK_db_name` FOREIGN KEY (`db_name`) REFERENCES `db` (`name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + + +insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dmaap','dmaap1234','dmaap'); +insert into db (name,host) values ('Elasticsearch','dl_es'); +insert into db (name,host) values ('MongoDB','dl_mongodb'); +insert into db (name,host) values ('Druid','dl_druid'); + + +-- in production, default enabled should be off +insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON'); +insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0); + + +insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_'); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index 108eb4e0..1136e304 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -20,13 +20,9 @@ package org.onap.datalake.feeder.config; -import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.couchbase.CouchbaseConfiguration; import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.FilterType; -import org.springframework.context.annotation.ComponentScan; import lombok.Getter; import lombok.Setter; @@ -42,34 +38,18 @@ import lombok.Setter; @Setter @SpringBootConfiguration @ConfigurationProperties -//@ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = CouchbaseConfiguration.class)) -//https://stackoverflow.com/questions/29344313/prevent-application-commandlinerunner-classes-from-executing-during-junit-test @EnableAutoConfiguration -//@Profile("test") public class ApplicationConfiguration { - private String couchbaseHost; - private String couchbaseUser; - private String couchbasePass; - private String couchbaseBucket; - - // private int mongodbPort; - // private String mongodbDatabase; - private String dmaapZookeeperHostPort; private String dmaapKafkaHostPort; private String dmaapKafkaGroup; private long dmaapKafkaTimeout; -// private boolean dmaapMonitorAllTopics; private int dmaapCheckNewTopicIntervalInSec; - //private String dmaapHostPort; - //private Set dmaapExcludeTopics; - //private Set dmaapIncludeTopics; private int kafkaConsumerCount; private boolean async; - private String elasticsearchHost; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java new file mode 100644 index 00000000..c34befcc --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java @@ -0,0 +1,135 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.controller; + +import java.io.IOException; +import java.util.Set; + +import javax.servlet.http.HttpServletResponse; + +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.repository.DbRepository; +import org.onap.datalake.feeder.service.DbService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.validation.BindingResult; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; + +/** + * This controller manages the big data storage settings. All the settings are saved in database. + * + * @author Guobiao Mo + * + */ + +@RestController +@RequestMapping(value = "/dbs", produces = { MediaType.APPLICATION_JSON_VALUE }) +public class DbController { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private DbRepository dbRepository; + + @Autowired + private DbService dbService; + + //list all dbs + @GetMapping("/") + @ResponseBody + public Iterable list() throws IOException { + Iterable ret = dbRepository.findAll(); + return ret; + } + + //Read a db + //the topics are missing in the return, since in we use @JsonBackReference on Db's topics + //need to the the following method to retrieve the topic list + @GetMapping("/{name}") + @ResponseBody + public Db getDb(@PathVariable("name") String dbName) throws IOException { + Db db = dbService.getDb(dbName); + return db; + } + + //Read topics in a DB + @GetMapping("/{name}/topics") + @ResponseBody + public Set getDbTopics(@PathVariable("name") String dbName) throws IOException { + Db db = dbService.getDb(dbName); + Set topics = db.getTopics(); + return topics; + } + + //Update Db + @PutMapping("/") + @ResponseBody + public Db updateDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException { + + if (result.hasErrors()) { + sendError(response, 400, "Error parsing DB: "+result.toString()); + return null; + } + + Db oldDb = getDb(db.getName()); + if (oldDb == null) { + sendError(response, 404, "Db not found: "+db.getName()); + return null; + } else { + dbRepository.save(db); + return db; + } + } + + //create a new Db + @PostMapping("/") + @ResponseBody + public Db createDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException { + + if (result.hasErrors()) { + sendError(response, 400, "Error parsing DB: "+result.toString()); + return null; + } + + Db oldDb = getDb(db.getName()); + if (oldDb != null) { + sendError(response, 400, "Db already exists: "+db.getName()); + return null; + } else { + dbRepository.save(db); + return db; + } + } + + private void sendError(HttpServletResponse response, int sc, String msg) throws IOException { + log.info(msg); + response.sendError(sc, msg); + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java new file mode 100644 index 00000000..2e13e1af --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java @@ -0,0 +1,78 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.controller; + +import java.io.IOException; + +import org.onap.datalake.feeder.service.PullService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * This controller controls DL data feeder. + * + * @author Guobiao Mo + * + */ + +@RestController +@RequestMapping(value = "/feeder", produces = { MediaType.TEXT_PLAIN_VALUE }) +public class FeederController { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private PullService pullService; + + /** + * @return message that application is started + * @throws IOException + */ + @GetMapping("/start") + public String start() throws IOException { + log.info("DataLake feeder starting to pull data from DMaaP..."); + pullService.start(); + return "DataLake feeder is running."; + } + + /** + * @return message that application stop process is triggered + */ + @GetMapping("/stop") + public String stop() { + pullService.shutdown(); + log.info("DataLake feeder is stopped."); + return "DataLake feeder is stopped."; + } + /** + * @return feeder status + */ + @GetMapping("/status") + public String status() { + String status = "Feeder is running: "+pullService.isRunning(); + log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc. + return status; + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java deleted file mode 100644 index 2b176370..00000000 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java +++ /dev/null @@ -1,77 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DataLake -* ================================================================================ -* Copyright 2019 China Mobile -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.datalake.feeder.controller; - -import java.io.IOException; - -import org.onap.datalake.feeder.service.PullService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.MediaType; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -/** - * This controller controls DL data feeder. - * - * @author Guobiao Mo - * - */ - -@RestController -@RequestMapping(value = "/pull", produces = { MediaType.TEXT_PLAIN_VALUE }) -public class PullController { - - private final Logger log = LoggerFactory.getLogger(this.getClass()); - - @Autowired - private PullService pullService; - - /** - * @return message that application is started - * @throws IOException - */ - @RequestMapping("/start") - public String start() throws IOException { - log.info("DataLake feeder starting to pull data from DMaaP..."); - pullService.start(); - return "DataLake feeder is running."; - } - - /** - * @return message that application stop process is triggered - */ - @RequestMapping("/stop") - public String stop() { - pullService.shutdown(); - log.info("DataLake feeder is stopped."); - return "DataLake feeder is stopped."; - } - /** - * @return feeder status - */ - @RequestMapping("/status") - public String status() { - String status = "to be impletemented"; - log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc. - return status; - } -} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java index 25028d58..c4aec14c 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java @@ -21,36 +21,44 @@ package org.onap.datalake.feeder.controller; import java.io.IOException; import java.util.List; -import java.util.Optional; +import java.util.Set; +import javax.servlet.http.HttpServletResponse; + +import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.repository.TopicRepository; +import org.onap.datalake.feeder.service.DbService; import org.onap.datalake.feeder.service.DmaapService; +import org.onap.datalake.feeder.service.TopicService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.validation.BindingResult; +import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; /** - * This controller manages all the topic settings. Topic "_DL_DEFAULT_" acts as - * the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is - * used for that topic. All the settings are saved in Couchbase. topic - * "_DL_DEFAULT_" is populated at setup by a DB script. + * This controller manages topic settings. + * + * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is used for that topic. + * All the settings are saved in database. + * topic "_DL_DEFAULT_" is populated at setup by a DB script. * * @author Guobiao Mo * */ @RestController -@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE }) +@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE}) public class TopicController { private final Logger log = LoggerFactory.getLogger(this.getClass()); @@ -60,7 +68,13 @@ public class TopicController { @Autowired private TopicRepository topicRepository; + + @Autowired + private TopicService topicService; + @Autowired + private DbService dbService; + //list all topics in DMaaP @GetMapping("/dmaap/") @ResponseBody @@ -77,33 +91,45 @@ public class TopicController { } //Read a topic - @GetMapping("/{name}") + @GetMapping("/{topicname}") @ResponseBody - public Topic getTopic(@PathVariable("name") String topicName) throws IOException { - //Topic topic = topicRepository.findFirstById(topicName); - Optional topic = topicRepository.findById(topicName); - if (topic.isPresent()) { - return topic.get(); - } else { - return null; - } + public Topic getTopic(@PathVariable("topicname") String topicName) throws IOException { + Topic topic = topicService.getTopic(topicName); + return topic; + } + + //Read DBs in a topic + @GetMapping("/{topicname}/dbs") + @ResponseBody + public Set getTopicDbs(@PathVariable("topicname") String topicName) throws IOException { + Topic topic = topicService.getTopic(topicName); + Set dbs = topic.getDbs(); + return dbs; } //Update Topic + //This is not a partial update: old topic is wiped out, and new topic is created base on the input json. + //One exception is that old DBs are kept @PutMapping("/") @ResponseBody - public Topic updateTopic(Topic topic, BindingResult result) throws IOException { + public Topic updateTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException { if (result.hasErrors()) { - log.error(result.toString()); - - return null;//TODO return binding error + sendError(response, 400, "Error parsing Topic: "+result.toString()); + return null; } - Topic oldTopic = getTopic(topic.getId()); + Topic oldTopic = getTopic(topic.getName()); if (oldTopic == null) { - return null;//TODO return not found error + sendError(response, 404, "Topic not found "+topic.getName()); + return null; } else { + if(!topic.isDefault()) { + Topic defaultTopic = topicService.getDefaultTopic(); + topic.setDefaultTopic(defaultTopic); + } + + topic.setDbs(oldTopic.getDbs()); topicRepository.save(topic); return topic; } @@ -112,20 +138,56 @@ public class TopicController { //create a new Topic @PostMapping("/") @ResponseBody - public Topic createTopic(Topic topic, BindingResult result) throws IOException { - + public Topic createTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException { + if (result.hasErrors()) { - log.error(result.toString()); + sendError(response, 400, "Error parsing Topic: "+result.toString()); return null; } - Topic oldTopic = getTopic(topic.getId()); + Topic oldTopic = getTopic(topic.getName()); if (oldTopic != null) { - return null;//TODO return 'already exists' error + sendError(response, 400, "Topic already exists "+topic.getName()); + return null; } else { + if(!topic.isDefault()) { + Topic defaultTopic = topicService.getDefaultTopic(); + topic.setDefaultTopic(defaultTopic); + } + topicRepository.save(topic); return topic; } } + //delete a db from the topic + @DeleteMapping("/{topicname}/db/{dbname}") + @ResponseBody + public Set deleteDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException { + Topic topic = topicService.getTopic(topicName); + Set dbs = topic.getDbs(); + dbs.remove(new Db(dbName)); + + topicRepository.save(topic); + return topic.getDbs(); + } + + //add a db to the topic + @PutMapping("/{topicname}/db/{dbname}") + @ResponseBody + public Set addDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException { + Topic topic = topicService.getTopic(topicName); + Set dbs = topic.getDbs(); + + Db db = dbService.getDb(dbName); + dbs.add(db); + + topicRepository.save(topic); + return topic.getDbs(); + } + + private void sendError(HttpServletResponse response, int sc, String msg) throws IOException { + log.info(msg); + response.sendError(sc, msg); + } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java new file mode 100644 index 00000000..bbaedadc --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java @@ -0,0 +1,83 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.domain; + +import java.util.Set; + +import javax.persistence.CascadeType; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.ManyToMany; +import javax.persistence.Table; + +import com.fasterxml.jackson.annotation.JsonBackReference; + +import lombok.Getter; +import lombok.Setter; + +/** + * Domain class representing bid data storage + * + * @author Guobiao Mo + * + */ +@Setter +@Getter +@Entity +@Table(name = "db") +public class Db { + @Id + private String name; + + private String host; + private String login; + private String pass; + + private String property1; + private String property2; + private String property3; + + @JsonBackReference + @ManyToMany(mappedBy = "dbs", cascade=CascadeType.ALL) + /* + @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER) + @JoinTable( name = "map_db_topic", + joinColumns = { @JoinColumn(name="db_name") }, + inverseJoinColumns = { @JoinColumn(name="topic_name") } + ) */ + protected Set topics; + + public Db() { + } + + public Db(String name) { + this.name = name; + } + + @Override + public boolean equals(Object obj) { + return name.equals(((Db)obj).getName()); + } + + @Override + public int hashCode() { + return name.hashCode(); + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index ace33dcc..e1da4d4d 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -19,39 +19,61 @@ */ package org.onap.datalake.feeder.domain; +import java.util.Set; import java.util.function.Predicate; -import javax.validation.constraints.NotNull; +import javax.persistence.CascadeType; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.FetchType; +import javax.persistence.Id; +import javax.persistence.JoinColumn; +import javax.persistence.JoinTable; +import javax.persistence.ManyToMany; +import javax.persistence.ManyToOne; +import javax.persistence.Table; import org.apache.commons.lang3.StringUtils; import org.json.JSONObject; import org.onap.datalake.feeder.enumeration.DataFormat; -import org.springframework.data.annotation.Id; -import org.springframework.data.annotation.Transient; -import org.springframework.data.couchbase.core.mapping.Document; +import com.fasterxml.jackson.annotation.JsonBackReference; + +import lombok.Getter; import lombok.Setter; /** - * Domain class representing topic table in Couchbase + * Domain class representing topic * * @author Guobiao Mo * */ -@Document @Setter +@Getter +@Entity +@Table(name = "topic") public class Topic { - @NotNull @Id - private String id;//topic name + private String name;//topic name - @Transient + @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) + @JoinColumn(name = "default_topic", nullable = true) private Topic defaultTopic; //for protected Kafka topics private String login; private String pass; + //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL) + @JsonBackReference + //@JsonManagedReference + @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER) + @JoinTable( name = "map_db_topic", + joinColumns = { @JoinColumn(name="topic_name") }, + inverseJoinColumns = { @JoinColumn(name="db_name") } + ) + protected Set dbs; + /** * indicate if we should monitor this topic */ @@ -60,20 +82,14 @@ public class Topic { /** * save raw message text */ + @Column(name = "save_raw") private Boolean saveRaw; /** - * true: save it to Elasticsearch false: don't save null: use default - */ - private Boolean supportElasticsearch; - private Boolean supportCouchbase; - private Boolean supportDruid; - - /** - * need to explicitly tell feeder the data format of the message + * need to explicitly tell feeder the data format of the message. * support JSON, XML, YAML, TEXT */ - private DataFormat dataFormat; + private String dataFormat; /** * TTL in day @@ -81,26 +97,24 @@ public class Topic { private Integer ttl; //if this flag is true, need to correlate alarm cleared message to previous alarm + @Column(name = "correlate_cleared_message") private Boolean correlateClearedMessage; //the value in the JSON with this path will be used as DB id + @Column(name = "message_id_path") private String messageIdPath; public Topic() { } - public Topic(String id) { - this.id = id; + public Topic(String name) { + this.name = name; } - public String getId() { - return id; + public boolean isDefault() { + return "_DL_DEFAULT_".equals(name); } - public void setDefaultTopic(Topic defaultTopic) { - this.defaultTopic = defaultTopic; - } - public boolean isEnabled() { return is(enabled, Topic::isEnabled); } @@ -121,7 +135,7 @@ public class Topic { public DataFormat getDataFormat() { if (dataFormat != null) { - return dataFormat; + return DataFormat.fromString(dataFormat); } else if (defaultTopic != null) { return defaultTopic.getDataFormat(); } else { @@ -148,24 +162,51 @@ public class Topic { return is(saveRaw, Topic::isSaveRaw); } - public boolean isSupportElasticsearch() { - return is(supportElasticsearch, Topic::isSupportElasticsearch); + public boolean supportElasticsearch() { + return containDb("Elasticsearch");//TODO string hard codes + } + + public boolean supportCouchbase() { + return containDb("Couchbase"); } - public boolean isSupportCouchbase() { - return is(supportCouchbase, Topic::isSupportCouchbase); + public boolean supportDruid() { + return containDb("Druid"); } - public boolean isSupportDruid() { - return is(supportDruid, Topic::isSupportDruid); + public boolean supportMongoDB() { + return containDb("MongoDB"); } - //extract DB id from a JSON attribute, TODO support multiple attributes + private boolean containDb(String dbName) { + Db db = new Db(dbName); + + if(dbs!=null && dbs.contains(db)) { + return true; + } + + if (defaultTopic != null) { + return defaultTopic.containDb(dbName); + } else { + return false; + } + } + + //extract DB id from JSON attributes, support multiple attributes public String getMessageId(JSONObject json) { String id = null; if(StringUtils.isNotBlank(messageIdPath)) { - id = json.query(messageIdPath).toString(); + String[] paths=messageIdPath.split(","); + + StringBuilder sb= new StringBuilder(); + for(int i=0; i0) { + sb.append('^'); + } + sb.append(json.query(paths[i]).toString()); + } + id = sb.toString(); } return id; @@ -173,20 +214,17 @@ public class Topic { @Override public String toString() { - return id; + return name; } - /** - * @return the messageIdPath - */ - public String getMessageIdPath() { - return messageIdPath; + @Override + public boolean equals(Object obj) { + return name.equals(((Topic)obj).getName()); } - /** - * @param messageIdPath the messageIdPath to set - */ - public void setMessageIdPath(String messageIdPath) { - this.messageIdPath = messageIdPath; + @Override + public int hashCode() { + return name.hashCode(); } + } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java new file mode 100644 index 00000000..ae03f469 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java @@ -0,0 +1,36 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.repository; + +import org.onap.datalake.feeder.domain.Db; + +import org.springframework.data.repository.CrudRepository; + +/** + * + * Db Repository + * + * @author Guobiao Mo + * + */ + +public interface DbRepository extends CrudRepository { + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java index 37d1a669..2d9adef8 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java @@ -20,48 +20,17 @@ package org.onap.datalake.feeder.repository; import org.onap.datalake.feeder.domain.Topic; -import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed; -import org.springframework.data.couchbase.core.query.Query; -import org.springframework.data.couchbase.core.query.ViewIndexed; -import org.springframework.data.couchbase.repository.CouchbasePagingAndSortingRepository; -import org.springframework.data.domain.Page; -import org.springframework.data.domain.Pageable; - -import java.util.List; +import org.springframework.data.repository.CrudRepository; /** * - * Topic Repository interface, implementation is taken care by Spring framework. - * Customization is done through TopicRepositoryCustom and its implementation TopicRepositoryImpl. + * Topic Repository * * @author Guobiao Mo * - */ -@ViewIndexed(designDoc = "topic", viewName = "all") -public interface TopicRepository extends CouchbasePagingAndSortingRepository, TopicRepositoryCustom { -/* - Topic findFirstById(String topic); - - Topic findByIdAndState(String topic, boolean state); - - //Supports native JSON query string - @Query("{topic:'?0'}") - Topic findTopicById(String topic); - - @Query("{topic: { $regex: ?0 } })") - List findTopicByRegExId(String topic); - - - //Page findByCompanyIdAndNameLikeOrderByName(String companyId, String name, Pageable pageable); - - @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} and companyId = $1 and $2 within #{#n1ql.bucket}") - Topic findByCompanyAndAreaId(String companyId, String areaId); + */ - @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} AND ANY phone IN phoneNumbers SATISFIES phone = $1 END") - List findByPhoneNumber(String telephoneNumber); +public interface TopicRepository extends CrudRepository { - @Query("SELECT COUNT(*) AS count FROM #{#n1ql.bucket} WHERE #{#n1ql.filter} and companyId = $1") - Long countBuildings(String companyId); - */ } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java deleted file mode 100644 index 220a8f76..00000000 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java +++ /dev/null @@ -1,28 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DataLake -* ================================================================================ -* Copyright 2019 China Mobile -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.datalake.feeder.repository; - -/** - * @author Guobiao Mo - * - */ -public interface TopicRepositoryCustom { - long updateTopic(String topic, Boolean state); -} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java deleted file mode 100644 index 018d5b95..00000000 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java +++ /dev/null @@ -1,67 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DataLake -* ================================================================================ -* Copyright 2019 China Mobile -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ -package org.onap.datalake.feeder.repository; - -import org.onap.datalake.feeder.domain.Topic; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.couchbase.core.CouchbaseTemplate; -/* -import org.springframework.data.mongodb.MongoDbFactory; -import org.springframework.data.mongodb.core.MongoTemplate; -import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver; -import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper; -import org.springframework.data.mongodb.core.convert.MappingMongoConverter; -import org.springframework.data.mongodb.core.mapping.MongoMappingContext; -import org.springframework.data.mongodb.core.query.Criteria; -import org.springframework.data.mongodb.core.query.Query; -import org.springframework.data.mongodb.core.query.Update; - -import com.mongodb.WriteResult; -import com.mongodb.client.result.UpdateResult; -*/ -import java.util.List; - -/** - * @author Guobiao Mo - * - */ -public class TopicRepositoryImpl implements TopicRepositoryCustom { - - @Autowired - CouchbaseTemplate template; - - @Override - public long updateTopic(String topic, Boolean state) { -/* - Query query = new Query(Criteria.where("id").is(topic)); - Update update = new Update(); - update.set("state", state); - - UpdateResult result = mongoTemplate.updateFirst(query, update, Topic.class); - - if(result!=null) - return result.getModifiedCount(); - else - */ return 0L; - - - - } -} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java index 35432587..f74829e1 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java @@ -27,7 +27,7 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.json.JSONObject; -import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.domain.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,18 +57,20 @@ public class CouchbaseService { private final Logger log = LoggerFactory.getLogger(this.getClass()); @Autowired - private ApplicationConfiguration config; - + private DbService dbService; + Bucket bucket; @PostConstruct private void init() { // Initialize Couchbase Connection - Cluster cluster = CouchbaseCluster.create(config.getCouchbaseHost()); - cluster.authenticate(config.getCouchbaseUser(), config.getCouchbasePass()); - bucket = cluster.openBucket(config.getCouchbaseBucket()); + + Db couchbase = dbService.getCouchbase(); + Cluster cluster = CouchbaseCluster.create(couchbase.getHost()); + cluster.authenticate(couchbase.getLogin(), couchbase.getPass()); + bucket = cluster.openBucket(couchbase.getProperty1()); - log.info("Connect to Couchbase " + config.getCouchbaseHost()); + log.info("Connect to Couchbase " + couchbase.getHost()); // Create a N1QL Primary Index (but ignore if it exists) bucket.bucketManager().createN1qlPrimaryIndex(true, false); @@ -90,15 +92,21 @@ public class CouchbaseService { //setup TTL int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second - String id = getId(topic.getId()); + String id = getId(topic, json); JsonDocument doc = JsonDocument.create(id, expiry, jsonObject); documents.add(doc); } saveDocuments(documents); } - - private String getId(String topicStr) { + public String getId(Topic topic, JSONObject json) { + //if this topic requires extract id from JSON + String id = topic.getMessageId(json); + if(id != null) { + return id; + } + + String topicStr= topic.getName(); //String id = topicStr+":"+timestamp+":"+UUID.randomUUID(); //https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2 @@ -106,7 +114,7 @@ public class CouchbaseService { // increment by 1, initialize at 0 if counter doc not found //TODO how slow is this compared with above UUID approach? JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 - String id = topicStr +":"+ nextIdNumber.content(); + id = topicStr +":"+ nextIdNumber.content(); return id; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java new file mode 100644 index 00000000..f0d943d3 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java @@ -0,0 +1,67 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.util.Optional; + +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.repository.DbRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Service for Dbs + * + * @author Guobiao Mo + * + */ +@Service +public class DbService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private DbRepository dbRepository; + + public Db getDb(String name) { + Optional ret = dbRepository.findById(name); + return ret.isPresent() ? ret.get() : null; + } + + public Db getCouchbase() { + return getDb("Couchbase"); + } + + public Db getElasticsearch() { + return getDb("Elasticsearch"); + } + + public Db getMongoDB() { + return getDb("MongoDB"); + } + + public Db getDruid() { + return getDb("Druid"); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java index cbcc5f86..1f637e1a 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -31,6 +31,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; @@ -40,6 +41,7 @@ import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.domain.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,14 +62,18 @@ public class ElasticsearchService { @Autowired private ApplicationConfiguration config; + + @Autowired + private DbService dbService; private RestHighLevelClient client; ActionListener listener; @PostConstruct private void init() { - String elasticsearchHost = config.getElasticsearchHost(); - + Db elasticsearch = dbService.getElasticsearch(); + String elasticsearchHost = elasticsearch.getHost(); + // Initialize the Connection client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http"))); @@ -93,14 +99,16 @@ public class ElasticsearchService { public void ensureTableExist(String topic) throws IOException { String topicLower = topic.toLowerCase(); - - CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); - try { + + GetIndexRequest request = new GetIndexRequest(); + request.indices(topicLower); + + boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); + if(!exists){ + CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged()); - }catch(ElasticsearchStatusException e) { - log.info("{} create ES topic status: {}", topic, e.getDetailedMessage()); - } + } } //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME @@ -114,7 +122,10 @@ public class ElasticsearchService { continue; } } - request.add(new IndexRequest(topic.getId().toLowerCase(), "doc").source(json.toString(), XContentType.JSON)); + + String id = topic.getMessageId(json); //id can be null + + request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON)); } if(config.isAsync()) { client.bulkAsync(request, RequestOptions.DEFAULT, listener); @@ -122,19 +133,23 @@ public class ElasticsearchService { try { client.bulk(request, RequestOptions.DEFAULT); } catch (IOException e) { - log.error( topic.getId() , e); + log.error( topic.getName() , e); } } } private boolean correlateClearedMessage(JSONObject json) { - boolean found = false; - + boolean found = true; + /*TODO * 1. check if this is a alarm cleared message * 2. search previous alarm message * 3. update previous message, if success, set found=true */ + //for Sonar test, remove the following + if(json.isNull("kkkkk")) { + found = false; + } return found; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java index 3dcbd8ee..4433c8c4 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java @@ -27,8 +27,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import javax.annotation.PostConstruct; - import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,10 +56,13 @@ public class PullService { @Autowired private ApplicationConfiguration config; - @PostConstruct - private void init() { + /** + * @return the isRunning + */ + public boolean isRunning() { + return isRunning; } - + /** * start pulling. * @@ -109,6 +110,7 @@ public class PullService { executorService.awaitTermination(10L, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.error("executor.awaitTermination", e); + Thread.currentThread().interrupt(); } isRunning = false; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index 1cd3a8a1..84e4fb7d 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -152,11 +152,11 @@ public class StoreService { } private void saveJsons(Topic topic, List jsons) { - if (topic.isSupportCouchbase()) { + if (topic.supportCouchbase()) { couchbaseService.saveJsons(topic, jsons); } - if (topic.isSupportElasticsearch()) { + if (topic.supportElasticsearch()) { elasticsearchService.saveJsons(topic, jsons); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index 9b8fabc1..4e10a365 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -34,8 +34,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** - * Service for topics topic setting is stored in Couchbase, bucket 'dl', see - * application.properties for Spring setup + * Service for topics * * @author Guobiao Mo * @@ -68,15 +67,14 @@ public class TopicService { return null; } + //TODO caller should not modify the returned topic, maybe return a clone public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException { Topic topic = getTopic(topicStr); if (topic == null) { - topic = new Topic(topicStr); + topic = getDefaultTopic(); } - - topic.setDefaultTopic(getDefaultTopic()); - if(ensureTableExist && topic.isEnabled() && topic.isSupportElasticsearch()) { + if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) { elasticsearchService.ensureTableExist(topicStr); } return topic; diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties index a0ab90fe..ea94d004 100644 --- a/components/datalake-handler/feeder/src/main/resources/application.properties +++ b/components/datalake-handler/feeder/src/main/resources/application.properties @@ -2,6 +2,16 @@ server.port = 1680 +# Spring connection to MariaDB for ORM +#spring.jpa.hibernate.ddl-auto=update +spring.jpa.hibernate.ddl-auto=none +spring.jpa.show-sql=false + +#spring.datasource.driver-class-name=com.mysql.jdbc.Driver +spring.datasource.url=jdbc:mariadb://dl_mariadb:3306/datalake?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8 +spring.datasource.username=nook +spring.datasource.password=nook123 + #For Beijing lab #dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80 @@ -30,24 +40,3 @@ logging.level.org.springframework.web=ERROR logging.level.com.att.nsa.apiClient.http=ERROR logging.level.org.onap.datalake=DEBUG -# Spring connection to Couchbase DB for ORM - -#not work when run in osboxes, but works in Eclipse -spring.couchbase.bootstrap-hosts=dl_couchbase -#spring.couchbase.bootstrap-hosts=172.30.1.74 - -#a user with name as bucket.name must be created, with the pass as bucket.password -# https://stackoverflow.com/questions/51496589/bucket-password-in-couchbase -spring.couchbase.bucket.name=dl -spring.couchbase.bucket.password=dl1234 -spring.data.couchbase.auto-index=true - -#DL Feeder DB: Couchbase -couchbaseHost=dl_couchbase -#couchbaseHost=172.30.1.74 -couchbaseUser=dmaap -couchbasePass=dmaap1234 -couchbaseBucket=dmaap - -#DL Feeder DB: Elasticsearch -elasticsearchHost=dl_es diff --git a/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json index e536c7b3..a20e5eb3 100644 --- a/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json +++ b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json @@ -227,7 +227,7 @@ "taskDuration": "PT1H", "completionTimeout": "PT30M", "consumerProperties": { - "bootstrap.servers": "dl_dmaap:9092" + "bootstrap.servers": "message-router-kafka:9092" }, "useEarliestOffset": true } diff --git a/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json new file mode 100644 index 00000000..cb3c98da --- /dev/null +++ b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json @@ -0,0 +1,52 @@ +{ + "_id": "5bb3dfae5bea3f1cb49d4f3f", + "cambria.partition": "AAI", + "event-header": { + "severity": "NORMAL", + "entity-type": "esr-thirdparty-sdnc", + "top-entity-type": "esr-thirdparty-sdnc", + "entity-link": "/aai/v11/external-system/esr-thirdparty-sdnc-list/esr-thirdparty-sdnc/SDWANController1", + "event-type": "AAI-EVENT", + "domain": "dev", + "action": "CREATE", + "sequence-number": "0", + "id": "69f2935f-c3c1-4a63-b3f1-519c3898328b", + "source-name": "ying", + "version": "v11", + "timestamp": "20180919-06:20:44:216" + }, + "entity": { + "thirdparty-sdnc-id": "SDWANController1", + "resource-version": "1537338043473", + "location": "Core", + "product-name": "SD-WAN", + "esr-system-info-list": { + "esr-system-info": [ + { + "esr-system-info-id": "SDWANController1-ESR-1", + "system-type": "example-system-type-val-12078", + "service-url": "https://172.19.48.77:18008", + "ssl-cacert": "example-ssl-cacert-val-20589", + "type": "WAN", + "ssl-insecure": true, + "system-status": "example-system-status-val-23435", + "version": "V3R1", + "passive": true, + "password": "Onap@12345", + "protocol": "RESTCONF", + "ip-address": "172.19.48.77", + "cloud-domain": "example-cloud-domain-val-76077", + "user-name": "northapi1@huawei.com", + "system-name": "SDWANController", + "port": "18008", + "vendor": "IP-WAN", + "resource-version": "1537338044166", + "remote-path": "example-remotepath-val-5833", + "default-tenant": "example-default-tenant-val-71148" + } + ] + } + }, + "_dl_type_": "JSON", + "_dl_text_": "{\"cambria.partition\":\"AAI\",\"event-header\":{\"severity\":\"NORMAL\",\"entity-type\":\"esr-thirdparty-sdnc\",\"top-entity-type\":\"esr-thirdparty-sdnc\",\"entity-link\":\"/aai/v11/external-system/esr-thirdparty-sdnc-list/esr-thirdparty-sdnc/SDWANController1\",\"event-type\":\"AAI-EVENT\",\"domain\":\"dev\",\"action\":\"CREATE\",\"sequence-number\":\"0\",\"id\":\"69f2935f-c3c1-4a63-b3f1-519c3898328b\",\"source-name\":\"ying\",\"version\":\"v11\",\"timestamp\":\"20180919-06:20:44:216\"},\"entity\":{\"thirdparty-sdnc-id\":\"SDWANController1\",\"resource-version\":\"1537338043473\",\"location\":\"Core\",\"product-name\":\"SD-WAN\",\"esr-system-info-list\":{\"esr-system-info\":[{\"esr-system-info-id\":\"SDWANController1-ESR-1\",\"system-type\":\"example-system-type-val-12078\",\"service-url\":\"https://172.19.48.77:18008\",\"ssl-cacert\":\"example-ssl-cacert-val-20589\",\"type\":\"WAN\",\"ssl-insecure\":true,\"system-status\":\"example-system-status-val-23435\",\"version\":\"V3R1\",\"passive\":true,\"password\":\"Onap@12345\",\"protocol\":\"RESTCONF\",\"ip-address\":\"172.19.48.77\",\"cloud-domain\":\"example-cloud-domain-val-76077\",\"user-name\":\"northapi1@huawei.com\",\"system-name\":\"SDWANController\",\"port\":\"18008\",\"vendor\":\"IP-WAN\",\"resource-version\":\"1537338044166\",\"remote-path\":\"example-remotepath-val-5833\",\"default-tenant\":\"example-default-tenant-val-71148\"}]}}}" +} \ No newline at end of file diff --git a/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json index 16eb1636..19bf6ed5 100644 --- a/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json +++ b/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json @@ -142,7 +142,7 @@ "taskDuration": "PT1H", "completionTimeout": "PT30M", "consumerProperties": { - "bootstrap.servers": "dl_dmaap:9092" + "bootstrap.servers": "message-router-kafka:9092" }, "useEarliestOffset": true } diff --git a/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json index 3c871a87..6797b19e 100644 --- a/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json +++ b/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json @@ -200,7 +200,7 @@ "taskDuration": "PT1H", "completionTimeout": "PT30M", "consumerProperties": { - "bootstrap.servers": "dl_dmaap:9092" + "bootstrap.servers": "message-router-kafka:9092" }, "useEarliestOffset": true } diff --git a/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json index c3f60372..f910acee 100644 --- a/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json +++ b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json @@ -302,7 +302,7 @@ "taskDuration": "PT1H", "completionTimeout": "PT30M", "consumerProperties": { - "bootstrap.servers": "dl_dmaap:9092" + "bootstrap.servers": "message-router-kafka:9092" }, "useEarliestOffset": true } diff --git a/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json new file mode 100644 index 00000000..957060f4 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json @@ -0,0 +1,75 @@ +{ + "_id": { + "$oid": "5bb3d3aa5bea3f41300957c6" + }, + "sendEpsShort": { + "summary": "0.0 send eps (10 mins)", + "raw": 0 + }, + "recvEpsInstant": { + "summary": "0.03333333333333333 recv eps (1 min)", + "raw": 0.03333333333333333 + }, + "fanOut": { + "summary": "0.7051873815491838 sends per recv", + "raw": 0.7051873815491838 + }, + "sendEpsLong": { + "summary": "0.0 send eps (1 hr)", + "raw": 0 + }, + "kafkaConsumerTimeouts": { + "summary": "164 Kafka Consumers Timedout", + "raw": 164 + }, + "recvEpsLong": { + "summary": "0.03333333333333333 recv eps (1 hr)", + "raw": 0.03333333333333333 + }, + "sendEpsInstant": { + "summary": "0.0 send eps (1 min)", + "raw": 0 + }, + "recvEpsShort": { + "summary": "0.03333333333333333 recv eps (10 mins)", + "raw": 0.03333333333333333 + }, + "kafkaConsumerClaims": { + "summary": "1 Kafka Consumers Claimed", + "raw": 1 + }, + "version": { + "summary": "Version 1.1.3", + "raw": 0 + }, + "upTime": { + "summary": "604800 seconds since start", + "raw": 604800 + }, + "sendTotalEvents": { + "summary": "46139 Total events sent since start", + "raw": 46139 + }, + "hostname": "3d5704fccbc5", + "kafkaConsumerCacheMiss": { + "summary": "179 Kafka Consumer Cache Misses", + "raw": 179 + }, + "metricsSendTime": "1537639709 Metrics Send Time (epoch); 2018-09-22T18:08:29UTC", + "kafkaConsumerCacheHit": { + "summary": "317143 Kafka Consumer Cache Hits", + "raw": 317143 + }, + "now": 1537639709380, + "transactionEnabled": false, + "startTime": { + "summary": "1537034908 Start Time (epoch); 2018-09-15T18:08:28UTC", + "raw": 1537034908 + }, + "recvTotalEvents": { + "summary": "65428 Total events received since start", + "raw": 65428 + }, + "_dl_type_": "JSON", + "_dl_text_": "{\"sendEpsShort\":{\"summary\":\"0.0 send eps (10 mins)\",\"raw\":0},\"recvEpsInstant\":{\"summary\":\"0.03333333333333333 recv eps (1 min)\",\"raw\":0.03333333333333333},\"fanOut\":{\"summary\":\"0.7051873815491838 sends per recv\",\"raw\":0.7051873815491838},\"sendEpsLong\":{\"summary\":\"0.0 send eps (1 hr)\",\"raw\":0},\"kafkaConsumerTimeouts\":{\"summary\":\"164 Kafka Consumers Timedout\",\"raw\":164},\"recvEpsLong\":{\"summary\":\"0.03333333333333333 recv eps (1 hr)\",\"raw\":0.03333333333333333},\"sendEpsInstant\":{\"summary\":\"0.0 send eps (1 min)\",\"raw\":0},\"recvEpsShort\":{\"summary\":\"0.03333333333333333 recv eps (10 mins)\",\"raw\":0.03333333333333333},\"kafkaConsumerClaims\":{\"summary\":\"1 Kafka Consumers Claimed\",\"raw\":1},\"version\":{\"summary\":\"Version 1.1.3\",\"raw\":0},\"upTime\":{\"summary\":\"604800 seconds since start\",\"raw\":604800},\"sendTotalEvents\":{\"summary\":\"46139 Total events sent since start\",\"raw\":46139},\"hostname\":\"3d5704fccbc5\",\"kafkaConsumerCacheMiss\":{\"summary\":\"179 Kafka Consumer Cache Misses\",\"raw\":179},\"metricsSendTime\":\"1537639709 Metrics Send Time (epoch); 2018-09-22T18:08:29UTC\",\"kafkaConsumerCacheHit\":{\"summary\":\"317143 Kafka Consumer Cache Hits\",\"raw\":317143},\"now\":1537639709380,\"transactionEnabled\":false,\"startTime\":{\"summary\":\"1537034908 Start Time (epoch); 2018-09-15T18:08:28UTC\",\"raw\":1537034908},\"recvTotalEvents\":{\"summary\":\"65428 Total events received since start\",\"raw\":65428}}" +} diff --git a/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json new file mode 100644 index 00000000..8de08f76 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json @@ -0,0 +1,25 @@ +{ + "_id": "5bb3d3ad5bea3f41300959ba", + "closedLoopEventClient": "DCAE.HolmesInstance", + "policyVersion": "1.0.0.5", + "policyName": "CCVPN", + "policyScope": "service=SOTNService,type=SampleType,closedLoopControlName=CL-CCVPN-d925ed73-8231-4d02-9545-db4e101f88f8", + "target_type": "VM", + "AAI": { + "serviceType": "TestService", + "service-instance_service-instance-id": "200", + "globalSubscriberId": "Customer1", + "vserver_vserver-name": "TBD", + "network-information_network-id": "providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F200" + }, + "closedLoopAlarmStart": "1532769303924000", + "closedLoopEventStatus": "ONSET", + "version": "1.0.2", + "closedLoopControlName": "ControlLoop-CCVPN-2179b738-fd36-4843-a71a-a8c24c70c66b", + "target": "vserver.vserver-name", + "closedLoopAlarmEnd": "1532769303924000", + "requestID": "6f455b14-efd9-450a-bf78-e47d55b6da87", + "from": "DCAE", + "_dl_type_": "JSON", + "_dl_text_": "{\"closedLoopEventClient\":\"DCAE.HolmesInstance\",\"policyVersion\":\"1.0.0.5\",\"policyName\":\"CCVPN\",\"policyScope\":\"service=SOTNService,type=SampleType,closedLoopControlName=CL-CCVPN-d925ed73-8231-4d02-9545-db4e101f88f8\",\"target_type\":\"VM\",\"AAI\":{\"serviceType\":\"TestService\",\"service-instance.service-instance-id\":\"200\",\"globalSubscriberId\":\"Customer1\",\"vserver.vserver-name\":\"TBD\",\"network-information.network-id\":\"providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F200\"},\"closedLoopAlarmStart\":1532769303924000,\"closedLoopEventStatus\":\"ONSET\",\"version\":\"1.0.2\",\"closedLoopControlName\":\"ControlLoop-CCVPN-2179b738-fd36-4843-a71a-a8c24c70c66b\",\"target\":\"vserver.vserver-name\",\"closedLoopAlarmEnd\":1532769303924000,\"requestID\":\"6f455b14-efd9-450a-bf78-e47d55b6da87\",\"from\":\"DCAE\"}" +} \ No newline at end of file diff --git a/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json new file mode 100644 index 00000000..bb506d54 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json @@ -0,0 +1,57 @@ +{ + "_id": "5bb3d3b45bea3f41300960f8", + "event": { + "commonEventHeader": { + "sourceId": "/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27", + "startEpochMicrosec": "1537438335829000", + "eventId": "2ef8b41b-b081-477b-9d0b-1aaaa3b69857", + "domain": "fault", + "lastEpochMicrosec": 1537438335829000, + "eventName": "Fault_Route_Status", + "sourceName": "/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27", + "priority": "High", + "version": 3, + "reportingEntityName": "Domain_Contorller" + }, + "faultFields": { + "eventSeverity": "CRITICAL", + "alarmCondition": "Route_Status", + "faultFieldsVersion": 2, + "specificProblem": "Fault_SOTN_Service_Status", + "alarmAdditionalInformation": [ + { + "name": "networkId", + "value": "providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100" + }, + { + "name": "node", + "value": "11.11.11.11" + }, + { + "name": "tp-id", + "value": "27" + }, + { + "name": "oper-status", + "value": "down" + }, + { + "name": "network-ref", + "value": "providerId/5555/clientId/6666/topologyId/33" + }, + { + "name": "node-ref", + "value": "0.51.0.103" + }, + { + "name": "tp-ref", + "value": "4" + } + ], + "eventSourceType": "other", + "vfStatus": "Active" + } + }, + "_dl_type_": "JSON", + "_dl_text_": "{\"event\":{\"commonEventHeader\":{\"sourceId\":\"/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27\",\"startEpochMicrosec\":1537438335829000,\"eventId\":\"2ef8b41b-b081-477b-9d0b-1aaaa3b69857\",\"domain\":\"fault\",\"lastEpochMicrosec\":1537438335829000,\"eventName\":\"Fault_Route_Status\",\"sourceName\":\"/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27\",\"priority\":\"High\",\"version\":3,\"reportingEntityName\":\"Domain_Contorller\"},\"faultFields\":{\"eventSeverity\":\"CRITICAL\",\"alarmCondition\":\"Route_Status\",\"faultFieldsVersion\":2,\"specificProblem\":\"Fault_SOTN_Service_Status\",\"alarmAdditionalInformation\":[{\"name\":\"networkId\",\"value\":\"providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100\"},{\"name\":\"node\",\"value\":\"11.11.11.11\"},{\"name\":\"tp-id\",\"value\":\"27\"},{\"name\":\"oper-status\",\"value\":\"down\"},{\"name\":\"network-ref\",\"value\":\"providerId/5555/clientId/6666/topologyId/33\"},{\"name\":\"node-ref\",\"value\":\"0.51.0.103\"},{\"name\":\"tp-ref\",\"value\":\"4\"}],\"eventSourceType\":\"other\",\"vfStatus\":\"Active\"}}}" +} \ No newline at end of file diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java index 934451fe..02db5a25 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java @@ -56,10 +56,6 @@ public class ApplicationConfigurationTest { @Test public void readConfig() { - assertNotNull(config.getCouchbaseHost()); - assertNotNull(config.getCouchbaseUser()); - assertNotNull(config.getCouchbasePass()); - assertNotNull(config.getCouchbaseBucket()); assertNotNull(config.getDmaapZookeeperHostPort()); assertNotNull(config.getDmaapKafkaHostPort()); diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java new file mode 100644 index 00000000..ea1d6894 --- /dev/null +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java @@ -0,0 +1,44 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.domain; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import org.junit.Test; + +/** + * Test Db + * + * @author Guobiao Mo + * + */ + +public class DbTest { + + @Test + public void testIs() { + Db couchbase=new Db("Couchbase"); + Db mongoDB=new Db("MongoDB"); + Db mongoDB2=new Db("MongoDB"); + assertNotEquals(couchbase.hashCode(), mongoDB.hashCode()); + assertNotEquals(couchbase, mongoDB); + assertEquals(mongoDB, mongoDB2); + } +} diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java index 23ec3b10..1e402522 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java @@ -23,8 +23,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.HashSet; + import org.json.JSONObject; -import org.junit.Test; +import org.junit.Test; +import org.onap.datalake.feeder.enumeration.DataFormat; /** * Test Topic @@ -49,18 +52,50 @@ public class TopicTest { assertEquals(value, "hello"); } + @Test + public void getMessageIdFromMultipleAttributes() { + String text = "{ data: { data2 : { value : 'hello'}, data3 : 'world'}}"; + + JSONObject json = new JSONObject(text); + + Topic topic = new Topic("test getMessageId"); + topic.setMessageIdPath("/data/data2/value,/data/data3"); + + String value = topic.getMessageId(json); + + assertEquals(value, "hello^world"); + } + @Test public void testIs() { - Topic defaultTopic=new Topic("default"); + Topic defaultTopic=new Topic("_DL_DEFAULT_"); Topic testTopic = new Topic("test"); testTopic.setDefaultTopic(defaultTopic); + + assertTrue(defaultTopic.isDefault()); + assertFalse(testTopic.isDefault()); + + assertTrue(testTopic.equals(new Topic("test"))); + assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode()); + + defaultTopic.setDbs(new HashSet<>()); + defaultTopic.getDbs().add(new Db("Elasticsearch")); + assertTrue(testTopic.supportElasticsearch()); + assertFalse(testTopic.supportCouchbase()); + assertFalse(testTopic.supportDruid()); + assertFalse(testTopic.supportMongoDB()); + + defaultTopic.getDbs().remove(new Db("Elasticsearch")); + assertFalse(testTopic.supportElasticsearch()); - defaultTopic.setSupportElasticsearch(true); - boolean b = testTopic.isSupportElasticsearch(); - assertTrue(b); + defaultTopic.setCorrelateClearedMessage(true); + defaultTopic.setDataFormat("XML"); + defaultTopic.setEnabled(true); + defaultTopic.setSaveRaw(true); + assertTrue(testTopic.isCorrelateClearedMessage()); + assertTrue(testTopic.isEnabled()); + assertTrue(testTopic.isSaveRaw()); - defaultTopic.setSupportElasticsearch(false); - b = testTopic.isSupportElasticsearch(); - assertFalse(b); + assertEquals(defaultTopic.getDataFormat(), DataFormat.XML); } } diff --git a/components/datalake-handler/feeder/src/test/resources/application.properties b/components/datalake-handler/feeder/src/test/resources/application.properties index ede5999b..d6d98e64 100644 --- a/components/datalake-handler/feeder/src/test/resources/application.properties +++ b/components/datalake-handler/feeder/src/test/resources/application.properties @@ -2,14 +2,6 @@ server.port = 1680 - -#For Beijing lab -#dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80 -#dmaapKafkaHostPort=kafka.mr01.onap.vip:80 -#spring.couchbase.bootstrap-hosts=172.30.1.74 -#couchbaseHost=172.30.1.74 - - #DMaaP #dmaapZookeeperHostPort=127.0.0.1:2181 #dmaapKafkaHostPort=127.0.0.1:9092 @@ -30,13 +22,4 @@ logging.level.org.springframework.web=ERROR logging.level.com.att.nsa.apiClient.http=ERROR logging.level.org.onap.datalake=DEBUG - -#DL Feeder DB: Couchbase -couchbaseHost=dl_couchbase -#couchbaseHost=172.30.1.74 -couchbaseUser=dmaap -couchbasePass=dmaap1234 -couchbaseBucket=dmaap - -#DL Feeder DB: Elasticsearch -elasticsearchHost=dl_es + diff --git a/components/datalake-handler/pom.xml b/components/datalake-handler/pom.xml index ede2a27b..a526cb54 100644 --- a/components/datalake-handler/pom.xml +++ b/components/datalake-handler/pom.xml @@ -31,13 +31,19 @@ 3.1.2.RELEASE 2.9.6 2.0.0 - 6.6.0 + 6.7.0 + + org.mariadb.jdbc + mariadb-java-client + 2.4.1 + + commons-io commons-io @@ -142,6 +148,13 @@ ${springboot.version} + + + org.springframework.boot + spring-boot-starter-data-jpa + ${springboot.version} + + org.springframework.boot spring-boot-starter-data-couchbase -- cgit 1.2.3-korg