summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVijay Venkatesh Kumar <vv770d@att.com>2019-04-08 16:23:35 +0000
committerGerrit Code Review <gerrit@onap.org>2019-04-08 16:23:35 +0000
commitf4394959f7803c351543873d038af48eb91075f8 (patch)
tree7d0e7f8a3a4f372b1f005319a0e45bc132b2bcf4
parent3601ddf5c10e2a0dfcbbf854fd40cfbbc14074f0 (diff)
parent8dc9d71a2465f5c1e4beb52c2375efe02bcde174 (diff)
Merge "Use MariaDB to store application configurations"
-rw-r--r--components/datalake-handler/feeder/pom.xml15
-rw-r--r--components/datalake-handler/feeder/src/assembly/docker-compose.yml10
-rw-r--r--components/datalake-handler/feeder/src/assembly/scripts/init_db.sql54
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java20
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java135
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java)13
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java114
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java83
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java130
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java)14
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java39
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java67
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java30
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java67
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java39
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java12
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java10
-rw-r--r--components/datalake-handler/feeder/src/main/resources/application.properties31
-rw-r--r--components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json2
-rw-r--r--components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json52
-rw-r--r--components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json2
-rw-r--r--components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json2
-rw-r--r--components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json2
-rw-r--r--components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json75
-rw-r--r--components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json25
-rw-r--r--components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json57
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java4
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java44
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java51
-rw-r--r--components/datalake-handler/feeder/src/test/resources/application.properties19
-rw-r--r--components/datalake-handler/pom.xml15
32 files changed, 930 insertions, 307 deletions
diff --git a/components/datalake-handler/feeder/pom.xml b/components/datalake-handler/feeder/pom.xml
index f88baf36..5b47a245 100644
--- a/components/datalake-handler/feeder/pom.xml
+++ b/components/datalake-handler/feeder/pom.xml
@@ -10,13 +10,19 @@
<version>1.0.0-SNAPSHOT</version>
</parent>
- <groupId>org.onap.datalake</groupId>
+ <groupId>org.onap.dcaegen2.services.components.datalake-handler</groupId>
<artifactId>feeder</artifactId>
<packaging>jar</packaging>
<name>DataLake Feeder</name>
<dependencies>
+
+ <dependency>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
@@ -42,7 +48,12 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
-
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jpa</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-couchbase</artifactId>
diff --git a/components/datalake-handler/feeder/src/assembly/docker-compose.yml b/components/datalake-handler/feeder/src/assembly/docker-compose.yml
deleted file mode 100644
index 7ca466b6..00000000
--- a/components/datalake-handler/feeder/src/assembly/docker-compose.yml
+++ /dev/null
@@ -1,10 +0,0 @@
-version: '2'
-services:
-
- datalake:
- image: moguobiao/datalake-storage
- container_name: datalake-storage
- environment:
- - no-needed-dmaapHost=10.0.2.15:3904
- ports:
- - "1680:1680"
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
new file mode 100644
index 00000000..2185320a
--- /dev/null
+++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
@@ -0,0 +1,54 @@
+create database datalake;
+use datalake;
+
+CREATE TABLE `topic` (
+ `name` varchar(255) NOT NULL,
+ `correlate_cleared_message` bit(1) DEFAULT NULL,
+ `enabled` bit(1) DEFAULT NULL,
+ `login` varchar(255) DEFAULT NULL,
+ `message_id_path` varchar(255) DEFAULT NULL,
+ `pass` varchar(255) DEFAULT NULL,
+ `save_raw` bit(1) DEFAULT NULL,
+ `ttl` int(11) DEFAULT NULL,
+ `data_format` varchar(255) DEFAULT NULL,
+ `default_topic` varchar(255) DEFAULT NULL,
+ PRIMARY KEY (`name`),
+ KEY `FK_default_topic` (`default_topic`),
+ CONSTRAINT `FK_default_topic` FOREIGN KEY (`default_topic`) REFERENCES `topic` (`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+
+CREATE TABLE `db` (
+ `name` varchar(255) NOT NULL,
+ `host` varchar(255) DEFAULT NULL,
+ `login` varchar(255) DEFAULT NULL,
+ `pass` varchar(255) DEFAULT NULL,
+ `property1` varchar(255) DEFAULT NULL,
+ `property2` varchar(255) DEFAULT NULL,
+ `property3` varchar(255) DEFAULT NULL,
+ PRIMARY KEY (`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+
+CREATE TABLE `map_db_topic` (
+ `db_name` varchar(255) NOT NULL,
+ `topic_name` varchar(255) NOT NULL,
+ PRIMARY KEY (`db_name`,`topic_name`),
+ KEY `FK_topic_name` (`topic_name`),
+ CONSTRAINT `FK_topic_name` FOREIGN KEY (`topic_name`) REFERENCES `topic` (`name`),
+ CONSTRAINT `FK_db_name` FOREIGN KEY (`db_name`) REFERENCES `db` (`name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+
+insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dmaap','dmaap1234','dmaap');
+insert into db (name,host) values ('Elasticsearch','dl_es');
+insert into db (name,host) values ('MongoDB','dl_mongodb');
+insert into db (name,host) values ('Druid','dl_druid');
+
+
+-- in production, default enabled should be off
+insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');
+insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);
+
+
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_');
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 108eb4e0..1136e304 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -20,13 +20,9 @@
package org.onap.datalake.feeder.config;
-import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.couchbase.CouchbaseConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.FilterType;
-import org.springframework.context.annotation.ComponentScan;
import lombok.Getter;
import lombok.Setter;
@@ -42,34 +38,18 @@ import lombok.Setter;
@Setter
@SpringBootConfiguration
@ConfigurationProperties
-//@ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = CouchbaseConfiguration.class))
-//https://stackoverflow.com/questions/29344313/prevent-application-commandlinerunner-classes-from-executing-during-junit-test
@EnableAutoConfiguration
-//@Profile("test")
public class ApplicationConfiguration {
- private String couchbaseHost;
- private String couchbaseUser;
- private String couchbasePass;
- private String couchbaseBucket;
-
- // private int mongodbPort;
- // private String mongodbDatabase;
-
private String dmaapZookeeperHostPort;
private String dmaapKafkaHostPort;
private String dmaapKafkaGroup;
private long dmaapKafkaTimeout;
-// private boolean dmaapMonitorAllTopics;
private int dmaapCheckNewTopicIntervalInSec;
- //private String dmaapHostPort;
- //private Set<String> dmaapExcludeTopics;
- //private Set<String> dmaapIncludeTopics;
private int kafkaConsumerCount;
private boolean async;
- private String elasticsearchHost;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
new file mode 100644
index 00000000..c34befcc
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
@@ -0,0 +1,135 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.controller;
+
+import java.io.IOException;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.repository.DbRepository;
+import org.onap.datalake.feeder.service.DbService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * This controller manages the big data storage settings. All the settings are saved in database.
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+@RestController
+@RequestMapping(value = "/dbs", produces = { MediaType.APPLICATION_JSON_VALUE })
+public class DbController {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private DbRepository dbRepository;
+
+ @Autowired
+ private DbService dbService;
+
+ //list all dbs
+ @GetMapping("/")
+ @ResponseBody
+ public Iterable<Db> list() throws IOException {
+ Iterable<Db> ret = dbRepository.findAll();
+ return ret;
+ }
+
+ //Read a db
+ //the topics are missing in the return, since in we use @JsonBackReference on Db's topics
+ //need to the the following method to retrieve the topic list
+ @GetMapping("/{name}")
+ @ResponseBody
+ public Db getDb(@PathVariable("name") String dbName) throws IOException {
+ Db db = dbService.getDb(dbName);
+ return db;
+ }
+
+ //Read topics in a DB
+ @GetMapping("/{name}/topics")
+ @ResponseBody
+ public Set<Topic> getDbTopics(@PathVariable("name") String dbName) throws IOException {
+ Db db = dbService.getDb(dbName);
+ Set<Topic> topics = db.getTopics();
+ return topics;
+ }
+
+ //Update Db
+ @PutMapping("/")
+ @ResponseBody
+ public Db updateDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing DB: "+result.toString());
+ return null;
+ }
+
+ Db oldDb = getDb(db.getName());
+ if (oldDb == null) {
+ sendError(response, 404, "Db not found: "+db.getName());
+ return null;
+ } else {
+ dbRepository.save(db);
+ return db;
+ }
+ }
+
+ //create a new Db
+ @PostMapping("/")
+ @ResponseBody
+ public Db createDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing DB: "+result.toString());
+ return null;
+ }
+
+ Db oldDb = getDb(db.getName());
+ if (oldDb != null) {
+ sendError(response, 400, "Db already exists: "+db.getName());
+ return null;
+ } else {
+ dbRepository.save(db);
+ return db;
+ }
+ }
+
+ private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
+ log.info(msg);
+ response.sendError(sc, msg);
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
index 2b176370..2e13e1af 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@@ -37,8 +38,8 @@ import org.springframework.web.bind.annotation.RestController;
*/
@RestController
-@RequestMapping(value = "/pull", produces = { MediaType.TEXT_PLAIN_VALUE })
-public class PullController {
+@RequestMapping(value = "/feeder", produces = { MediaType.TEXT_PLAIN_VALUE })
+public class FeederController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -49,7 +50,7 @@ public class PullController {
* @return message that application is started
* @throws IOException
*/
- @RequestMapping("/start")
+ @GetMapping("/start")
public String start() throws IOException {
log.info("DataLake feeder starting to pull data from DMaaP...");
pullService.start();
@@ -59,7 +60,7 @@ public class PullController {
/**
* @return message that application stop process is triggered
*/
- @RequestMapping("/stop")
+ @GetMapping("/stop")
public String stop() {
pullService.shutdown();
log.info("DataLake feeder is stopped.");
@@ -68,9 +69,9 @@ public class PullController {
/**
* @return feeder status
*/
- @RequestMapping("/status")
+ @GetMapping("/status")
public String status() {
- String status = "to be impletemented";
+ String status = "Feeder is running: "+pullService.isRunning();
log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc.
return status;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
index 25028d58..c4aec14c 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -21,36 +21,44 @@ package org.onap.datalake.feeder.controller;
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
+import java.util.Set;
+import javax.servlet.http.HttpServletResponse;
+
+import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.TopicRepository;
+import org.onap.datalake.feeder.service.DbService;
import org.onap.datalake.feeder.service.DmaapService;
+import org.onap.datalake.feeder.service.TopicService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
/**
- * This controller manages all the topic settings. Topic "_DL_DEFAULT_" acts as
- * the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is
- * used for that topic. All the settings are saved in Couchbase. topic
- * "_DL_DEFAULT_" is populated at setup by a DB script.
+ * This controller manages topic settings.
+ *
+ * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is used for that topic.
+ * All the settings are saved in database.
+ * topic "_DL_DEFAULT_" is populated at setup by a DB script.
*
* @author Guobiao Mo
*
*/
@RestController
-@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })
+@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
public class TopicController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -60,7 +68,13 @@ public class TopicController {
@Autowired
private TopicRepository topicRepository;
+
+ @Autowired
+ private TopicService topicService;
+ @Autowired
+ private DbService dbService;
+
//list all topics in DMaaP
@GetMapping("/dmaap/")
@ResponseBody
@@ -77,33 +91,45 @@ public class TopicController {
}
//Read a topic
- @GetMapping("/{name}")
+ @GetMapping("/{topicname}")
@ResponseBody
- public Topic getTopic(@PathVariable("name") String topicName) throws IOException {
- //Topic topic = topicRepository.findFirstById(topicName);
- Optional<Topic> topic = topicRepository.findById(topicName);
- if (topic.isPresent()) {
- return topic.get();
- } else {
- return null;
- }
+ public Topic getTopic(@PathVariable("topicname") String topicName) throws IOException {
+ Topic topic = topicService.getTopic(topicName);
+ return topic;
+ }
+
+ //Read DBs in a topic
+ @GetMapping("/{topicname}/dbs")
+ @ResponseBody
+ public Set<Db> getTopicDbs(@PathVariable("topicname") String topicName) throws IOException {
+ Topic topic = topicService.getTopic(topicName);
+ Set<Db> dbs = topic.getDbs();
+ return dbs;
}
//Update Topic
+ //This is not a partial update: old topic is wiped out, and new topic is created base on the input json.
+ //One exception is that old DBs are kept
@PutMapping("/")
@ResponseBody
- public Topic updateTopic(Topic topic, BindingResult result) throws IOException {
+ public Topic updateTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
if (result.hasErrors()) {
- log.error(result.toString());
-
- return null;//TODO return binding error
+ sendError(response, 400, "Error parsing Topic: "+result.toString());
+ return null;
}
- Topic oldTopic = getTopic(topic.getId());
+ Topic oldTopic = getTopic(topic.getName());
if (oldTopic == null) {
- return null;//TODO return not found error
+ sendError(response, 404, "Topic not found "+topic.getName());
+ return null;
} else {
+ if(!topic.isDefault()) {
+ Topic defaultTopic = topicService.getDefaultTopic();
+ topic.setDefaultTopic(defaultTopic);
+ }
+
+ topic.setDbs(oldTopic.getDbs());
topicRepository.save(topic);
return topic;
}
@@ -112,20 +138,56 @@ public class TopicController {
//create a new Topic
@PostMapping("/")
@ResponseBody
- public Topic createTopic(Topic topic, BindingResult result) throws IOException {
-
+ public Topic createTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
+
if (result.hasErrors()) {
- log.error(result.toString());
+ sendError(response, 400, "Error parsing Topic: "+result.toString());
return null;
}
- Topic oldTopic = getTopic(topic.getId());
+ Topic oldTopic = getTopic(topic.getName());
if (oldTopic != null) {
- return null;//TODO return 'already exists' error
+ sendError(response, 400, "Topic already exists "+topic.getName());
+ return null;
} else {
+ if(!topic.isDefault()) {
+ Topic defaultTopic = topicService.getDefaultTopic();
+ topic.setDefaultTopic(defaultTopic);
+ }
+
topicRepository.save(topic);
return topic;
}
}
+ //delete a db from the topic
+ @DeleteMapping("/{topicname}/db/{dbname}")
+ @ResponseBody
+ public Set<Db> deleteDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException {
+ Topic topic = topicService.getTopic(topicName);
+ Set<Db> dbs = topic.getDbs();
+ dbs.remove(new Db(dbName));
+
+ topicRepository.save(topic);
+ return topic.getDbs();
+ }
+
+ //add a db to the topic
+ @PutMapping("/{topicname}/db/{dbname}")
+ @ResponseBody
+ public Set<Db> addDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException {
+ Topic topic = topicService.getTopic(topicName);
+ Set<Db> dbs = topic.getDbs();
+
+ Db db = dbService.getDb(dbName);
+ dbs.add(db);
+
+ topicRepository.save(topic);
+ return topic.getDbs();
+ }
+
+ private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
+ log.info(msg);
+ response.sendError(sc, msg);
+ }
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
new file mode 100644
index 00000000..bbaedadc
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
@@ -0,0 +1,83 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+import java.util.Set;
+
+import javax.persistence.CascadeType;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.ManyToMany;
+import javax.persistence.Table;
+
+import com.fasterxml.jackson.annotation.JsonBackReference;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Domain class representing bid data storage
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Setter
+@Getter
+@Entity
+@Table(name = "db")
+public class Db {
+ @Id
+ private String name;
+
+ private String host;
+ private String login;
+ private String pass;
+
+ private String property1;
+ private String property2;
+ private String property3;
+
+ @JsonBackReference
+ @ManyToMany(mappedBy = "dbs", cascade=CascadeType.ALL)
+ /*
+ @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER)
+ @JoinTable( name = "map_db_topic",
+ joinColumns = { @JoinColumn(name="db_name") },
+ inverseJoinColumns = { @JoinColumn(name="topic_name") }
+ ) */
+ protected Set<Topic> topics;
+
+ public Db() {
+ }
+
+ public Db(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return name.equals(((Db)obj).getName());
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index ace33dcc..e1da4d4d 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -19,39 +19,61 @@
*/
package org.onap.datalake.feeder.domain;
+import java.util.Set;
import java.util.function.Predicate;
-import javax.validation.constraints.NotNull;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.JoinTable;
+import javax.persistence.ManyToMany;
+import javax.persistence.ManyToOne;
+import javax.persistence.Table;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
import org.onap.datalake.feeder.enumeration.DataFormat;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.annotation.Transient;
-import org.springframework.data.couchbase.core.mapping.Document;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+
+import lombok.Getter;
import lombok.Setter;
/**
- * Domain class representing topic table in Couchbase
+ * Domain class representing topic
*
* @author Guobiao Mo
*
*/
-@Document
@Setter
+@Getter
+@Entity
+@Table(name = "topic")
public class Topic {
- @NotNull
@Id
- private String id;//topic name
+ private String name;//topic name
- @Transient
+ @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
+ @JoinColumn(name = "default_topic", nullable = true)
private Topic defaultTopic;
//for protected Kafka topics
private String login;
private String pass;
+ //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
+ @JsonBackReference
+ //@JsonManagedReference
+ @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER)
+ @JoinTable( name = "map_db_topic",
+ joinColumns = { @JoinColumn(name="topic_name") },
+ inverseJoinColumns = { @JoinColumn(name="db_name") }
+ )
+ protected Set<Db> dbs;
+
/**
* indicate if we should monitor this topic
*/
@@ -60,20 +82,14 @@ public class Topic {
/**
* save raw message text
*/
+ @Column(name = "save_raw")
private Boolean saveRaw;
/**
- * true: save it to Elasticsearch false: don't save null: use default
- */
- private Boolean supportElasticsearch;
- private Boolean supportCouchbase;
- private Boolean supportDruid;
-
- /**
- * need to explicitly tell feeder the data format of the message
+ * need to explicitly tell feeder the data format of the message.
* support JSON, XML, YAML, TEXT
*/
- private DataFormat dataFormat;
+ private String dataFormat;
/**
* TTL in day
@@ -81,26 +97,24 @@ public class Topic {
private Integer ttl;
//if this flag is true, need to correlate alarm cleared message to previous alarm
+ @Column(name = "correlate_cleared_message")
private Boolean correlateClearedMessage;
//the value in the JSON with this path will be used as DB id
+ @Column(name = "message_id_path")
private String messageIdPath;
public Topic() {
}
- public Topic(String id) {
- this.id = id;
+ public Topic(String name) {
+ this.name = name;
}
- public String getId() {
- return id;
+ public boolean isDefault() {
+ return "_DL_DEFAULT_".equals(name);
}
- public void setDefaultTopic(Topic defaultTopic) {
- this.defaultTopic = defaultTopic;
- }
-
public boolean isEnabled() {
return is(enabled, Topic::isEnabled);
}
@@ -121,7 +135,7 @@ public class Topic {
public DataFormat getDataFormat() {
if (dataFormat != null) {
- return dataFormat;
+ return DataFormat.fromString(dataFormat);
} else if (defaultTopic != null) {
return defaultTopic.getDataFormat();
} else {
@@ -148,24 +162,51 @@ public class Topic {
return is(saveRaw, Topic::isSaveRaw);
}
- public boolean isSupportElasticsearch() {
- return is(supportElasticsearch, Topic::isSupportElasticsearch);
+ public boolean supportElasticsearch() {
+ return containDb("Elasticsearch");//TODO string hard codes
+ }
+
+ public boolean supportCouchbase() {
+ return containDb("Couchbase");
}
- public boolean isSupportCouchbase() {
- return is(supportCouchbase, Topic::isSupportCouchbase);
+ public boolean supportDruid() {
+ return containDb("Druid");
}
- public boolean isSupportDruid() {
- return is(supportDruid, Topic::isSupportDruid);
+ public boolean supportMongoDB() {
+ return containDb("MongoDB");
}
- //extract DB id from a JSON attribute, TODO support multiple attributes
+ private boolean containDb(String dbName) {
+ Db db = new Db(dbName);
+
+ if(dbs!=null && dbs.contains(db)) {
+ return true;
+ }
+
+ if (defaultTopic != null) {
+ return defaultTopic.containDb(dbName);
+ } else {
+ return false;
+ }
+ }
+
+ //extract DB id from JSON attributes, support multiple attributes
public String getMessageId(JSONObject json) {
String id = null;
if(StringUtils.isNotBlank(messageIdPath)) {
- id = json.query(messageIdPath).toString();
+ String[] paths=messageIdPath.split(",");
+
+ StringBuilder sb= new StringBuilder();
+ for(int i=0; i<paths.length; i++) {
+ if(i>0) {
+ sb.append('^');
+ }
+ sb.append(json.query(paths[i]).toString());
+ }
+ id = sb.toString();
}
return id;
@@ -173,20 +214,17 @@ public class Topic {
@Override
public String toString() {
- return id;
+ return name;
}
- /**
- * @return the messageIdPath
- */
- public String getMessageIdPath() {
- return messageIdPath;
+ @Override
+ public boolean equals(Object obj) {
+ return name.equals(((Topic)obj).getName());
}
- /**
- * @param messageIdPath the messageIdPath to set
- */
- public void setMessageIdPath(String messageIdPath) {
- this.messageIdPath = messageIdPath;
+ @Override
+ public int hashCode() {
+ return name.hashCode();
}
+
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java
index 220a8f76..ae03f469 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java
@@ -19,10 +19,18 @@
*/
package org.onap.datalake.feeder.repository;
+import org.onap.datalake.feeder.domain.Db;
+
+import org.springframework.data.repository.CrudRepository;
+
/**
+ *
+ * Db Repository
+ *
* @author Guobiao Mo
*
- */
-public interface TopicRepositoryCustom {
- long updateTopic(String topic, Boolean state);
+ */
+
+public interface DbRepository extends CrudRepository<Db, String> {
+
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
index 37d1a669..2d9adef8 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
@@ -20,48 +20,17 @@
package org.onap.datalake.feeder.repository;
import org.onap.datalake.feeder.domain.Topic;
-import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed;
-import org.springframework.data.couchbase.core.query.Query;
-import org.springframework.data.couchbase.core.query.ViewIndexed;
-import org.springframework.data.couchbase.repository.CouchbasePagingAndSortingRepository;
-import org.springframework.data.domain.Page;
-import org.springframework.data.domain.Pageable;
-
-import java.util.List;
+import org.springframework.data.repository.CrudRepository;
/**
*
- * Topic Repository interface, implementation is taken care by Spring framework.
- * Customization is done through TopicRepositoryCustom and its implementation TopicRepositoryImpl.
+ * Topic Repository
*
* @author Guobiao Mo
*
- */
-@ViewIndexed(designDoc = "topic", viewName = "all")
-public interface TopicRepository extends CouchbasePagingAndSortingRepository<Topic, String>, TopicRepositoryCustom {
-/*
- Topic findFirstById(String topic);
-
- Topic findByIdAndState(String topic, boolean state);
-
- //Supports native JSON query string
- @Query("{topic:'?0'}")
- Topic findTopicById(String topic);
-
- @Query("{topic: { $regex: ?0 } })")
- List<Topic> findTopicByRegExId(String topic);
-
-
- //Page<Topic> findByCompanyIdAndNameLikeOrderByName(String companyId, String name, Pageable pageable);
-
- @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} and companyId = $1 and $2 within #{#n1ql.bucket}")
- Topic findByCompanyAndAreaId(String companyId, String areaId);
+ */
- @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} AND ANY phone IN phoneNumbers SATISFIES phone = $1 END")
- List<Topic> findByPhoneNumber(String telephoneNumber);
+public interface TopicRepository extends CrudRepository<Topic, String> {
- @Query("SELECT COUNT(*) AS count FROM #{#n1ql.bucket} WHERE #{#n1ql.filter} and companyId = $1")
- Long countBuildings(String companyId);
- */
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java
deleted file mode 100644
index 018d5b95..00000000
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
-* ============LICENSE_START=======================================================
-* ONAP : DataLake
-* ================================================================================
-* Copyright 2019 China Mobile
-*=================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* ============LICENSE_END=========================================================
-*/
-package org.onap.datalake.feeder.repository;
-
-import org.onap.datalake.feeder.domain.Topic;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.couchbase.core.CouchbaseTemplate;
-/*
-import org.springframework.data.mongodb.MongoDbFactory;
-import org.springframework.data.mongodb.core.MongoTemplate;
-import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;
-import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;
-import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
-import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
-import org.springframework.data.mongodb.core.query.Criteria;
-import org.springframework.data.mongodb.core.query.Query;
-import org.springframework.data.mongodb.core.query.Update;
-
-import com.mongodb.WriteResult;
-import com.mongodb.client.result.UpdateResult;
-*/
-import java.util.List;
-
-/**
- * @author Guobiao Mo
- *
- */
-public class TopicRepositoryImpl implements TopicRepositoryCustom {
-
- @Autowired
- CouchbaseTemplate template;
-
- @Override
- public long updateTopic(String topic, Boolean state) {
-/*
- Query query = new Query(Criteria.where("id").is(topic));
- Update update = new Update();
- update.set("state", state);
-
- UpdateResult result = mongoTemplate.updateFirst(query, update, Topic.class);
-
- if(result!=null)
- return result.getModifiedCount();
- else
- */ return 0L;
-
-
-
- }
-}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
index 35432587..f74829e1 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
@@ -27,7 +27,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.json.JSONObject;
-import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,18 +57,20 @@ public class CouchbaseService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
- private ApplicationConfiguration config;
-
+ private DbService dbService;
+
Bucket bucket;
@PostConstruct
private void init() {
// Initialize Couchbase Connection
- Cluster cluster = CouchbaseCluster.create(config.getCouchbaseHost());
- cluster.authenticate(config.getCouchbaseUser(), config.getCouchbasePass());
- bucket = cluster.openBucket(config.getCouchbaseBucket());
+
+ Db couchbase = dbService.getCouchbase();
+ Cluster cluster = CouchbaseCluster.create(couchbase.getHost());
+ cluster.authenticate(couchbase.getLogin(), couchbase.getPass());
+ bucket = cluster.openBucket(couchbase.getProperty1());
- log.info("Connect to Couchbase " + config.getCouchbaseHost());
+ log.info("Connect to Couchbase " + couchbase.getHost());
// Create a N1QL Primary Index (but ignore if it exists)
bucket.bucketManager().createN1qlPrimaryIndex(true, false);
@@ -90,15 +92,21 @@ public class CouchbaseService {
//setup TTL
int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second
- String id = getId(topic.getId());
+ String id = getId(topic, json);
JsonDocument doc = JsonDocument.create(id, expiry, jsonObject);
documents.add(doc);
}
saveDocuments(documents);
}
-
- private String getId(String topicStr) {
+ public String getId(Topic topic, JSONObject json) {
+ //if this topic requires extract id from JSON
+ String id = topic.getMessageId(json);
+ if(id != null) {
+ return id;
+ }
+
+ String topicStr= topic.getName();
//String id = topicStr+":"+timestamp+":"+UUID.randomUUID();
//https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2
@@ -106,7 +114,7 @@ public class CouchbaseService {
// increment by 1, initialize at 0 if counter doc not found
//TODO how slow is this compared with above UUID approach?
JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345
- String id = topicStr +":"+ nextIdNumber.content();
+ id = topicStr +":"+ nextIdNumber.content();
return id;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
new file mode 100644
index 00000000..f0d943d3
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
@@ -0,0 +1,67 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.util.Optional;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.repository.DbRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service for Dbs
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class DbService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private DbRepository dbRepository;
+
+ public Db getDb(String name) {
+ Optional<Db> ret = dbRepository.findById(name);
+ return ret.isPresent() ? ret.get() : null;
+ }
+
+ public Db getCouchbase() {
+ return getDb("Couchbase");
+ }
+
+ public Db getElasticsearch() {
+ return getDb("Elasticsearch");
+ }
+
+ public Db getMongoDB() {
+ return getDb("MongoDB");
+ }
+
+ public Db getDruid() {
+ return getDb("Druid");
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index cbcc5f86..1f637e1a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -31,6 +31,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
@@ -40,6 +41,7 @@ import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,14 +62,18 @@ public class ElasticsearchService {
@Autowired
private ApplicationConfiguration config;
+
+ @Autowired
+ private DbService dbService;
private RestHighLevelClient client;
ActionListener<BulkResponse> listener;
@PostConstruct
private void init() {
- String elasticsearchHost = config.getElasticsearchHost();
-
+ Db elasticsearch = dbService.getElasticsearch();
+ String elasticsearchHost = elasticsearch.getHost();
+
// Initialize the Connection
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
@@ -93,14 +99,16 @@ public class ElasticsearchService {
public void ensureTableExist(String topic) throws IOException {
String topicLower = topic.toLowerCase();
-
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
- try {
+
+ GetIndexRequest request = new GetIndexRequest();
+ request.indices(topicLower);
+
+ boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
+ if(!exists){
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
- }catch(ElasticsearchStatusException e) {
- log.info("{} create ES topic status: {}", topic, e.getDetailedMessage());
- }
+ }
}
//TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
@@ -114,7 +122,10 @@ public class ElasticsearchService {
continue;
}
}
- request.add(new IndexRequest(topic.getId().toLowerCase(), "doc").source(json.toString(), XContentType.JSON));
+
+ String id = topic.getMessageId(json); //id can be null
+
+ request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON));
}
if(config.isAsync()) {
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
@@ -122,19 +133,23 @@ public class ElasticsearchService {
try {
client.bulk(request, RequestOptions.DEFAULT);
} catch (IOException e) {
- log.error( topic.getId() , e);
+ log.error( topic.getName() , e);
}
}
}
private boolean correlateClearedMessage(JSONObject json) {
- boolean found = false;
-
+ boolean found = true;
+
/*TODO
* 1. check if this is a alarm cleared message
* 2. search previous alarm message
* 3. update previous message, if success, set found=true
*/
+ //for Sonar test, remove the following
+ if(json.isNull("kkkkk")) {
+ found = false;
+ }
return found;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
index 3dcbd8ee..4433c8c4 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
@@ -27,8 +27,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import javax.annotation.PostConstruct;
-
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,10 +56,13 @@ public class PullService {
@Autowired
private ApplicationConfiguration config;
- @PostConstruct
- private void init() {
+ /**
+ * @return the isRunning
+ */
+ public boolean isRunning() {
+ return isRunning;
}
-
+
/**
* start pulling.
*
@@ -109,6 +110,7 @@ public class PullService {
executorService.awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("executor.awaitTermination", e);
+ Thread.currentThread().interrupt();
}
isRunning = false;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index 1cd3a8a1..84e4fb7d 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -152,11 +152,11 @@ public class StoreService {
}
private void saveJsons(Topic topic, List<JSONObject> jsons) {
- if (topic.isSupportCouchbase()) {
+ if (topic.supportCouchbase()) {
couchbaseService.saveJsons(topic, jsons);
}
- if (topic.isSupportElasticsearch()) {
+ if (topic.supportElasticsearch()) {
elasticsearchService.saveJsons(topic, jsons);
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index 9b8fabc1..4e10a365 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -34,8 +34,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
- * Service for topics topic setting is stored in Couchbase, bucket 'dl', see
- * application.properties for Spring setup
+ * Service for topics
*
* @author Guobiao Mo
*
@@ -68,15 +67,14 @@ public class TopicService {
return null;
}
+ //TODO caller should not modify the returned topic, maybe return a clone
public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
Topic topic = getTopic(topicStr);
if (topic == null) {
- topic = new Topic(topicStr);
+ topic = getDefaultTopic();
}
-
- topic.setDefaultTopic(getDefaultTopic());
- if(ensureTableExist && topic.isEnabled() && topic.isSupportElasticsearch()) {
+ if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) {
elasticsearchService.ensureTableExist(topicStr);
}
return topic;
diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties
index a0ab90fe..ea94d004 100644
--- a/components/datalake-handler/feeder/src/main/resources/application.properties
+++ b/components/datalake-handler/feeder/src/main/resources/application.properties
@@ -2,6 +2,16 @@
server.port = 1680
+# Spring connection to MariaDB for ORM
+#spring.jpa.hibernate.ddl-auto=update
+spring.jpa.hibernate.ddl-auto=none
+spring.jpa.show-sql=false
+
+#spring.datasource.driver-class-name=com.mysql.jdbc.Driver
+spring.datasource.url=jdbc:mariadb://dl_mariadb:3306/datalake?autoReconnect=true&amp;useUnicode=true&amp;characterEncoding=UTF-8
+spring.datasource.username=nook
+spring.datasource.password=nook123
+
#For Beijing lab
#dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80
@@ -30,24 +40,3 @@ logging.level.org.springframework.web=ERROR
logging.level.com.att.nsa.apiClient.http=ERROR
logging.level.org.onap.datalake=DEBUG
-# Spring connection to Couchbase DB for ORM
-
-#not work when run in osboxes, but works in Eclipse
-spring.couchbase.bootstrap-hosts=dl_couchbase
-#spring.couchbase.bootstrap-hosts=172.30.1.74
-
-#a user with name as bucket.name must be created, with the pass as bucket.password
-# https://stackoverflow.com/questions/51496589/bucket-password-in-couchbase
-spring.couchbase.bucket.name=dl
-spring.couchbase.bucket.password=dl1234
-spring.data.couchbase.auto-index=true
-
-#DL Feeder DB: Couchbase
-couchbaseHost=dl_couchbase
-#couchbaseHost=172.30.1.74
-couchbaseUser=dmaap
-couchbasePass=dmaap1234
-couchbaseBucket=dmaap
-
-#DL Feeder DB: Elasticsearch
-elasticsearchHost=dl_es
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json
index e536c7b3..a20e5eb3 100644
--- a/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json
+++ b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json
@@ -227,7 +227,7 @@
"taskDuration": "PT1H",
"completionTimeout": "PT30M",
"consumerProperties": {
- "bootstrap.servers": "dl_dmaap:9092"
+ "bootstrap.servers": "message-router-kafka:9092"
},
"useEarliestOffset": true
}
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json
new file mode 100644
index 00000000..cb3c98da
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json
@@ -0,0 +1,52 @@
+{
+ "_id": "5bb3dfae5bea3f1cb49d4f3f",
+ "cambria.partition": "AAI",
+ "event-header": {
+ "severity": "NORMAL",
+ "entity-type": "esr-thirdparty-sdnc",
+ "top-entity-type": "esr-thirdparty-sdnc",
+ "entity-link": "/aai/v11/external-system/esr-thirdparty-sdnc-list/esr-thirdparty-sdnc/SDWANController1",
+ "event-type": "AAI-EVENT",
+ "domain": "dev",
+ "action": "CREATE",
+ "sequence-number": "0",
+ "id": "69f2935f-c3c1-4a63-b3f1-519c3898328b",
+ "source-name": "ying",
+ "version": "v11",
+ "timestamp": "20180919-06:20:44:216"
+ },
+ "entity": {
+ "thirdparty-sdnc-id": "SDWANController1",
+ "resource-version": "1537338043473",
+ "location": "Core",
+ "product-name": "SD-WAN",
+ "esr-system-info-list": {
+ "esr-system-info": [
+ {
+ "esr-system-info-id": "SDWANController1-ESR-1",
+ "system-type": "example-system-type-val-12078",
+ "service-url": "https://172.19.48.77:18008",
+ "ssl-cacert": "example-ssl-cacert-val-20589",
+ "type": "WAN",
+ "ssl-insecure": true,
+ "system-status": "example-system-status-val-23435",
+ "version": "V3R1",
+ "passive": true,
+ "password": "Onap@12345",
+ "protocol": "RESTCONF",
+ "ip-address": "172.19.48.77",
+ "cloud-domain": "example-cloud-domain-val-76077",
+ "user-name": "northapi1@huawei.com",
+ "system-name": "SDWANController",
+ "port": "18008",
+ "vendor": "IP-WAN",
+ "resource-version": "1537338044166",
+ "remote-path": "example-remotepath-val-5833",
+ "default-tenant": "example-default-tenant-val-71148"
+ }
+ ]
+ }
+ },
+ "_dl_type_": "JSON",
+ "_dl_text_": "{\"cambria.partition\":\"AAI\",\"event-header\":{\"severity\":\"NORMAL\",\"entity-type\":\"esr-thirdparty-sdnc\",\"top-entity-type\":\"esr-thirdparty-sdnc\",\"entity-link\":\"/aai/v11/external-system/esr-thirdparty-sdnc-list/esr-thirdparty-sdnc/SDWANController1\",\"event-type\":\"AAI-EVENT\",\"domain\":\"dev\",\"action\":\"CREATE\",\"sequence-number\":\"0\",\"id\":\"69f2935f-c3c1-4a63-b3f1-519c3898328b\",\"source-name\":\"ying\",\"version\":\"v11\",\"timestamp\":\"20180919-06:20:44:216\"},\"entity\":{\"thirdparty-sdnc-id\":\"SDWANController1\",\"resource-version\":\"1537338043473\",\"location\":\"Core\",\"product-name\":\"SD-WAN\",\"esr-system-info-list\":{\"esr-system-info\":[{\"esr-system-info-id\":\"SDWANController1-ESR-1\",\"system-type\":\"example-system-type-val-12078\",\"service-url\":\"https://172.19.48.77:18008\",\"ssl-cacert\":\"example-ssl-cacert-val-20589\",\"type\":\"WAN\",\"ssl-insecure\":true,\"system-status\":\"example-system-status-val-23435\",\"version\":\"V3R1\",\"passive\":true,\"password\":\"Onap@12345\",\"protocol\":\"RESTCONF\",\"ip-address\":\"172.19.48.77\",\"cloud-domain\":\"example-cloud-domain-val-76077\",\"user-name\":\"northapi1@huawei.com\",\"system-name\":\"SDWANController\",\"port\":\"18008\",\"vendor\":\"IP-WAN\",\"resource-version\":\"1537338044166\",\"remote-path\":\"example-remotepath-val-5833\",\"default-tenant\":\"example-default-tenant-val-71148\"}]}}}"
+} \ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json
index 16eb1636..19bf6ed5 100644
--- a/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json
+++ b/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json
@@ -142,7 +142,7 @@
"taskDuration": "PT1H",
"completionTimeout": "PT30M",
"consumerProperties": {
- "bootstrap.servers": "dl_dmaap:9092"
+ "bootstrap.servers": "message-router-kafka:9092"
},
"useEarliestOffset": true
}
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json
index 3c871a87..6797b19e 100644
--- a/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json
+++ b/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json
@@ -200,7 +200,7 @@
"taskDuration": "PT1H",
"completionTimeout": "PT30M",
"consumerProperties": {
- "bootstrap.servers": "dl_dmaap:9092"
+ "bootstrap.servers": "message-router-kafka:9092"
},
"useEarliestOffset": true
}
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json
index c3f60372..f910acee 100644
--- a/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json
+++ b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json
@@ -302,7 +302,7 @@
"taskDuration": "PT1H",
"completionTimeout": "PT30M",
"consumerProperties": {
- "bootstrap.servers": "dl_dmaap:9092"
+ "bootstrap.servers": "message-router-kafka:9092"
},
"useEarliestOffset": true
}
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json
new file mode 100644
index 00000000..957060f4
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json
@@ -0,0 +1,75 @@
+{
+ "_id": {
+ "$oid": "5bb3d3aa5bea3f41300957c6"
+ },
+ "sendEpsShort": {
+ "summary": "0.0 send eps (10 mins)",
+ "raw": 0
+ },
+ "recvEpsInstant": {
+ "summary": "0.03333333333333333 recv eps (1 min)",
+ "raw": 0.03333333333333333
+ },
+ "fanOut": {
+ "summary": "0.7051873815491838 sends per recv",
+ "raw": 0.7051873815491838
+ },
+ "sendEpsLong": {
+ "summary": "0.0 send eps (1 hr)",
+ "raw": 0
+ },
+ "kafkaConsumerTimeouts": {
+ "summary": "164 Kafka Consumers Timedout",
+ "raw": 164
+ },
+ "recvEpsLong": {
+ "summary": "0.03333333333333333 recv eps (1 hr)",
+ "raw": 0.03333333333333333
+ },
+ "sendEpsInstant": {
+ "summary": "0.0 send eps (1 min)",
+ "raw": 0
+ },
+ "recvEpsShort": {
+ "summary": "0.03333333333333333 recv eps (10 mins)",
+ "raw": 0.03333333333333333
+ },
+ "kafkaConsumerClaims": {
+ "summary": "1 Kafka Consumers Claimed",
+ "raw": 1
+ },
+ "version": {
+ "summary": "Version 1.1.3",
+ "raw": 0
+ },
+ "upTime": {
+ "summary": "604800 seconds since start",
+ "raw": 604800
+ },
+ "sendTotalEvents": {
+ "summary": "46139 Total events sent since start",
+ "raw": 46139
+ },
+ "hostname": "3d5704fccbc5",
+ "kafkaConsumerCacheMiss": {
+ "summary": "179 Kafka Consumer Cache Misses",
+ "raw": 179
+ },
+ "metricsSendTime": "1537639709 Metrics Send Time (epoch); 2018-09-22T18:08:29UTC",
+ "kafkaConsumerCacheHit": {
+ "summary": "317143 Kafka Consumer Cache Hits",
+ "raw": 317143
+ },
+ "now": 1537639709380,
+ "transactionEnabled": false,
+ "startTime": {
+ "summary": "1537034908 Start Time (epoch); 2018-09-15T18:08:28UTC",
+ "raw": 1537034908
+ },
+ "recvTotalEvents": {
+ "summary": "65428 Total events received since start",
+ "raw": 65428
+ },
+ "_dl_type_": "JSON",
+ "_dl_text_": "{\"sendEpsShort\":{\"summary\":\"0.0 send eps (10 mins)\",\"raw\":0},\"recvEpsInstant\":{\"summary\":\"0.03333333333333333 recv eps (1 min)\",\"raw\":0.03333333333333333},\"fanOut\":{\"summary\":\"0.7051873815491838 sends per recv\",\"raw\":0.7051873815491838},\"sendEpsLong\":{\"summary\":\"0.0 send eps (1 hr)\",\"raw\":0},\"kafkaConsumerTimeouts\":{\"summary\":\"164 Kafka Consumers Timedout\",\"raw\":164},\"recvEpsLong\":{\"summary\":\"0.03333333333333333 recv eps (1 hr)\",\"raw\":0.03333333333333333},\"sendEpsInstant\":{\"summary\":\"0.0 send eps (1 min)\",\"raw\":0},\"recvEpsShort\":{\"summary\":\"0.03333333333333333 recv eps (10 mins)\",\"raw\":0.03333333333333333},\"kafkaConsumerClaims\":{\"summary\":\"1 Kafka Consumers Claimed\",\"raw\":1},\"version\":{\"summary\":\"Version 1.1.3\",\"raw\":0},\"upTime\":{\"summary\":\"604800 seconds since start\",\"raw\":604800},\"sendTotalEvents\":{\"summary\":\"46139 Total events sent since start\",\"raw\":46139},\"hostname\":\"3d5704fccbc5\",\"kafkaConsumerCacheMiss\":{\"summary\":\"179 Kafka Consumer Cache Misses\",\"raw\":179},\"metricsSendTime\":\"1537639709 Metrics Send Time (epoch); 2018-09-22T18:08:29UTC\",\"kafkaConsumerCacheHit\":{\"summary\":\"317143 Kafka Consumer Cache Hits\",\"raw\":317143},\"now\":1537639709380,\"transactionEnabled\":false,\"startTime\":{\"summary\":\"1537034908 Start Time (epoch); 2018-09-15T18:08:28UTC\",\"raw\":1537034908},\"recvTotalEvents\":{\"summary\":\"65428 Total events received since start\",\"raw\":65428}}"
+}
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json
new file mode 100644
index 00000000..8de08f76
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json
@@ -0,0 +1,25 @@
+{
+ "_id": "5bb3d3ad5bea3f41300959ba",
+ "closedLoopEventClient": "DCAE.HolmesInstance",
+ "policyVersion": "1.0.0.5",
+ "policyName": "CCVPN",
+ "policyScope": "service=SOTNService,type=SampleType,closedLoopControlName=CL-CCVPN-d925ed73-8231-4d02-9545-db4e101f88f8",
+ "target_type": "VM",
+ "AAI": {
+ "serviceType": "TestService",
+ "service-instance_service-instance-id": "200",
+ "globalSubscriberId": "Customer1",
+ "vserver_vserver-name": "TBD",
+ "network-information_network-id": "providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F200"
+ },
+ "closedLoopAlarmStart": "1532769303924000",
+ "closedLoopEventStatus": "ONSET",
+ "version": "1.0.2",
+ "closedLoopControlName": "ControlLoop-CCVPN-2179b738-fd36-4843-a71a-a8c24c70c66b",
+ "target": "vserver.vserver-name",
+ "closedLoopAlarmEnd": "1532769303924000",
+ "requestID": "6f455b14-efd9-450a-bf78-e47d55b6da87",
+ "from": "DCAE",
+ "_dl_type_": "JSON",
+ "_dl_text_": "{\"closedLoopEventClient\":\"DCAE.HolmesInstance\",\"policyVersion\":\"1.0.0.5\",\"policyName\":\"CCVPN\",\"policyScope\":\"service=SOTNService,type=SampleType,closedLoopControlName=CL-CCVPN-d925ed73-8231-4d02-9545-db4e101f88f8\",\"target_type\":\"VM\",\"AAI\":{\"serviceType\":\"TestService\",\"service-instance.service-instance-id\":\"200\",\"globalSubscriberId\":\"Customer1\",\"vserver.vserver-name\":\"TBD\",\"network-information.network-id\":\"providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F200\"},\"closedLoopAlarmStart\":1532769303924000,\"closedLoopEventStatus\":\"ONSET\",\"version\":\"1.0.2\",\"closedLoopControlName\":\"ControlLoop-CCVPN-2179b738-fd36-4843-a71a-a8c24c70c66b\",\"target\":\"vserver.vserver-name\",\"closedLoopAlarmEnd\":1532769303924000,\"requestID\":\"6f455b14-efd9-450a-bf78-e47d55b6da87\",\"from\":\"DCAE\"}"
+} \ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json
new file mode 100644
index 00000000..bb506d54
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json
@@ -0,0 +1,57 @@
+{
+ "_id": "5bb3d3b45bea3f41300960f8",
+ "event": {
+ "commonEventHeader": {
+ "sourceId": "/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27",
+ "startEpochMicrosec": "1537438335829000",
+ "eventId": "2ef8b41b-b081-477b-9d0b-1aaaa3b69857",
+ "domain": "fault",
+ "lastEpochMicrosec": 1537438335829000,
+ "eventName": "Fault_Route_Status",
+ "sourceName": "/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27",
+ "priority": "High",
+ "version": 3,
+ "reportingEntityName": "Domain_Contorller"
+ },
+ "faultFields": {
+ "eventSeverity": "CRITICAL",
+ "alarmCondition": "Route_Status",
+ "faultFieldsVersion": 2,
+ "specificProblem": "Fault_SOTN_Service_Status",
+ "alarmAdditionalInformation": [
+ {
+ "name": "networkId",
+ "value": "providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100"
+ },
+ {
+ "name": "node",
+ "value": "11.11.11.11"
+ },
+ {
+ "name": "tp-id",
+ "value": "27"
+ },
+ {
+ "name": "oper-status",
+ "value": "down"
+ },
+ {
+ "name": "network-ref",
+ "value": "providerId/5555/clientId/6666/topologyId/33"
+ },
+ {
+ "name": "node-ref",
+ "value": "0.51.0.103"
+ },
+ {
+ "name": "tp-ref",
+ "value": "4"
+ }
+ ],
+ "eventSourceType": "other",
+ "vfStatus": "Active"
+ }
+ },
+ "_dl_type_": "JSON",
+ "_dl_text_": "{\"event\":{\"commonEventHeader\":{\"sourceId\":\"/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27\",\"startEpochMicrosec\":1537438335829000,\"eventId\":\"2ef8b41b-b081-477b-9d0b-1aaaa3b69857\",\"domain\":\"fault\",\"lastEpochMicrosec\":1537438335829000,\"eventName\":\"Fault_Route_Status\",\"sourceName\":\"/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27\",\"priority\":\"High\",\"version\":3,\"reportingEntityName\":\"Domain_Contorller\"},\"faultFields\":{\"eventSeverity\":\"CRITICAL\",\"alarmCondition\":\"Route_Status\",\"faultFieldsVersion\":2,\"specificProblem\":\"Fault_SOTN_Service_Status\",\"alarmAdditionalInformation\":[{\"name\":\"networkId\",\"value\":\"providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100\"},{\"name\":\"node\",\"value\":\"11.11.11.11\"},{\"name\":\"tp-id\",\"value\":\"27\"},{\"name\":\"oper-status\",\"value\":\"down\"},{\"name\":\"network-ref\",\"value\":\"providerId/5555/clientId/6666/topologyId/33\"},{\"name\":\"node-ref\",\"value\":\"0.51.0.103\"},{\"name\":\"tp-ref\",\"value\":\"4\"}],\"eventSourceType\":\"other\",\"vfStatus\":\"Active\"}}}"
+} \ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
index 934451fe..02db5a25 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
@@ -56,10 +56,6 @@ public class ApplicationConfigurationTest {
@Test
public void readConfig() {
- assertNotNull(config.getCouchbaseHost());
- assertNotNull(config.getCouchbaseUser());
- assertNotNull(config.getCouchbasePass());
- assertNotNull(config.getCouchbaseBucket());
assertNotNull(config.getDmaapZookeeperHostPort());
assertNotNull(config.getDmaapKafkaHostPort());
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java
new file mode 100644
index 00000000..ea1d6894
--- /dev/null
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java
@@ -0,0 +1,44 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import org.junit.Test;
+
+/**
+ * Test Db
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+public class DbTest {
+
+ @Test
+ public void testIs() {
+ Db couchbase=new Db("Couchbase");
+ Db mongoDB=new Db("MongoDB");
+ Db mongoDB2=new Db("MongoDB");
+ assertNotEquals(couchbase.hashCode(), mongoDB.hashCode());
+ assertNotEquals(couchbase, mongoDB);
+ assertEquals(mongoDB, mongoDB2);
+ }
+}
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
index 23ec3b10..1e402522 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
@@ -23,8 +23,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.util.HashSet;
+
import org.json.JSONObject;
-import org.junit.Test;
+import org.junit.Test;
+import org.onap.datalake.feeder.enumeration.DataFormat;
/**
* Test Topic
@@ -50,17 +53,49 @@ public class TopicTest {
}
@Test
+ public void getMessageIdFromMultipleAttributes() {
+ String text = "{ data: { data2 : { value : 'hello'}, data3 : 'world'}}";
+
+ JSONObject json = new JSONObject(text);
+
+ Topic topic = new Topic("test getMessageId");
+ topic.setMessageIdPath("/data/data2/value,/data/data3");
+
+ String value = topic.getMessageId(json);
+
+ assertEquals(value, "hello^world");
+ }
+
+ @Test
public void testIs() {
- Topic defaultTopic=new Topic("default");
+ Topic defaultTopic=new Topic("_DL_DEFAULT_");
Topic testTopic = new Topic("test");
testTopic.setDefaultTopic(defaultTopic);
+
+ assertTrue(defaultTopic.isDefault());
+ assertFalse(testTopic.isDefault());
+
+ assertTrue(testTopic.equals(new Topic("test")));
+ assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode());
+
+ defaultTopic.setDbs(new HashSet<>());
+ defaultTopic.getDbs().add(new Db("Elasticsearch"));
+ assertTrue(testTopic.supportElasticsearch());
+ assertFalse(testTopic.supportCouchbase());
+ assertFalse(testTopic.supportDruid());
+ assertFalse(testTopic.supportMongoDB());
+
+ defaultTopic.getDbs().remove(new Db("Elasticsearch"));
+ assertFalse(testTopic.supportElasticsearch());
- defaultTopic.setSupportElasticsearch(true);
- boolean b = testTopic.isSupportElasticsearch();
- assertTrue(b);
+ defaultTopic.setCorrelateClearedMessage(true);
+ defaultTopic.setDataFormat("XML");
+ defaultTopic.setEnabled(true);
+ defaultTopic.setSaveRaw(true);
+ assertTrue(testTopic.isCorrelateClearedMessage());
+ assertTrue(testTopic.isEnabled());
+ assertTrue(testTopic.isSaveRaw());
- defaultTopic.setSupportElasticsearch(false);
- b = testTopic.isSupportElasticsearch();
- assertFalse(b);
+ assertEquals(defaultTopic.getDataFormat(), DataFormat.XML);
}
}
diff --git a/components/datalake-handler/feeder/src/test/resources/application.properties b/components/datalake-handler/feeder/src/test/resources/application.properties
index ede5999b..d6d98e64 100644
--- a/components/datalake-handler/feeder/src/test/resources/application.properties
+++ b/components/datalake-handler/feeder/src/test/resources/application.properties
@@ -2,14 +2,6 @@
server.port = 1680
-
-#For Beijing lab
-#dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80
-#dmaapKafkaHostPort=kafka.mr01.onap.vip:80
-#spring.couchbase.bootstrap-hosts=172.30.1.74
-#couchbaseHost=172.30.1.74
-
-
#DMaaP
#dmaapZookeeperHostPort=127.0.0.1:2181
#dmaapKafkaHostPort=127.0.0.1:9092
@@ -30,13 +22,4 @@ logging.level.org.springframework.web=ERROR
logging.level.com.att.nsa.apiClient.http=ERROR
logging.level.org.onap.datalake=DEBUG
-
-#DL Feeder DB: Couchbase
-couchbaseHost=dl_couchbase
-#couchbaseHost=172.30.1.74
-couchbaseUser=dmaap
-couchbasePass=dmaap1234
-couchbaseBucket=dmaap
-
-#DL Feeder DB: Elasticsearch
-elasticsearchHost=dl_es
+
diff --git a/components/datalake-handler/pom.xml b/components/datalake-handler/pom.xml
index ede2a27b..a526cb54 100644
--- a/components/datalake-handler/pom.xml
+++ b/components/datalake-handler/pom.xml
@@ -31,7 +31,7 @@
<springcouchbase.version>3.1.2.RELEASE</springcouchbase.version>
<jackson.version>2.9.6</jackson.version>
<kafka.version>2.0.0</kafka.version>
- <elasticsearchjava.version>6.6.0</elasticsearchjava.version>
+ <elasticsearchjava.version>6.7.0</elasticsearchjava.version>
</properties>
@@ -39,6 +39,12 @@
<dependencies>
<dependency>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ <version>2.4.1</version>
+ </dependency>
+
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
@@ -142,6 +148,13 @@
<version>${springboot.version}</version>
</dependency>
<!-- end::actuator[] -->
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jpa</artifactId>
+ <version>${springboot.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-couchbase</artifactId>