diff options
author | Guobiao Mo <guobiaomo@chinamobile.com> | 2019-04-10 00:35:43 -0700 |
---|---|---|
committer | Guobiao Mo <guobiaomo@chinamobile.com> | 2019-04-10 00:35:43 -0700 |
commit | 35cc15e04411008b2f8094bbd3876e7a2daed587 (patch) | |
tree | 7ba9c49f05f8596b71395c98e0dca1fa61f7d12e /components/datalake-handler/feeder | |
parent | 2672b1261e87bff3e6526534bc51b56b97d2e5ab (diff) |
Support MongoDB as a data storage
Issue-ID: DCAEGEN2-1411
Change-Id: I06b69605e88d5b81500b788847e7c90ff4017a07
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder')
10 files changed, 257 insertions, 74 deletions
diff --git a/components/datalake-handler/feeder/pom.xml b/components/datalake-handler/feeder/pom.xml index d6b50787..9c1bb785 100644 --- a/components/datalake-handler/feeder/pom.xml +++ b/components/datalake-handler/feeder/pom.xml @@ -17,17 +17,17 @@ <dependencies> - + <dependency> - <groupId>org.mariadb.jdbc</groupId> - <artifactId>mariadb-java-client</artifactId> + <groupId>org.mariadb.jdbc</groupId> + <artifactId>mariadb-java-client</artifactId> </dependency> - + <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> </dependency> - + <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> @@ -47,12 +47,12 @@ <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> - - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-data-jpa</artifactId> - </dependency> - + + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-data-jpa</artifactId> + </dependency> + <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-couchbase</artifactId> @@ -148,6 +148,10 @@ <scope>compile</scope> </dependency> + <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongo-java-driver</artifactId> + </dependency> </dependencies> <build> diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql index 2185320a..83db9f1f 100644 --- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql +++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql @@ -21,6 +21,7 @@ CREATE TABLE `topic` ( CREATE TABLE `db` (
`name` varchar(255) NOT NULL,
`host` varchar(255) DEFAULT NULL,
+ `port` int(11) DEFAULT NULL,
`login` varchar(255) DEFAULT NULL,
`pass` varchar(255) DEFAULT NULL,
`property1` varchar(255) DEFAULT NULL,
@@ -40,9 +41,9 @@ CREATE TABLE `map_db_topic` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dmaap','dmaap1234','dmaap');
+insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dl','dl1234','dl');
insert into db (name,host) values ('Elasticsearch','dl_es');
-insert into db (name,host) values ('MongoDB','dl_mongodb');
+insert into db (name,host,port,property1) values ('MongoDB','dl_mongodb',27017,'datalake');
insert into db (name,host) values ('Druid','dl_druid');
@@ -51,4 +52,4 @@ insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_D insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);
-insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_');
diff --git a/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java b/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java new file mode 100644 index 00000000..2bc6faad --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java @@ -0,0 +1,60 @@ +package com.mongodb.internal.validator;
+
+//copy from https://github.com/mongodb/mongo-java-driver/blob/master/driver-core/src/main/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java
+//allow inserting name with dot
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2018 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+import org.bson.FieldNameValidator;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A field name validator for document that are meant for storage in MongoDB collections. It ensures that no fields contain a '.',
+ * or start with '$' (with the exception of "$db", "$ref", and "$id", so that DBRefs are not rejected).
+ *
+ * <p>This class should not be considered a part of the public API.</p>
+ */
+public class CollectibleDocumentFieldNameValidator implements FieldNameValidator {
+ // Have to support DBRef fields
+ private static final List<String> EXCEPTIONS = Arrays.asList("$db", "$ref", "$id");
+
+ @Override
+ public boolean validate(final String fieldName) {
+ if (fieldName == null) {
+ throw new IllegalArgumentException("Field name can not be null");
+ }
+
+ /* dl change
+ if (fieldName.contains(".")) {
+ return false;
+ }*/
+
+ if (fieldName.startsWith("$") && !EXCEPTIONS.contains(fieldName)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public FieldNameValidator getValidatorForField(final String fieldName) {
+ return this;
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java index bf9e417f..747a72c8 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java @@ -79,14 +79,14 @@ public class TopicController { @GetMapping("/dmaap/") @ResponseBody - @ApiOperation(value="List all topics in DMaaP.") + @ApiOperation(value="List all topic names in DMaaP.") public List<String> listDmaapTopics() throws IOException { return dmaapService.getTopics(); } @GetMapping("/") @ResponseBody - @ApiOperation(value="List all topics' details.") + @ApiOperation(value="List all topics' settings.") public Iterable<Topic> list() throws IOException { Iterable<Topic> ret = topicRepository.findAll(); return ret; @@ -94,7 +94,7 @@ public class TopicController { @GetMapping("/{topicName}") @ResponseBody - @ApiOperation(value="Get a topic's details.") + @ApiOperation(value="Get a topic's settings.") public Topic getTopic(@PathVariable("topicName") String topicName) throws IOException { Topic topic = topicService.getTopic(topicName); return topic; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java index bbaedadc..306af490 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java @@ -31,7 +31,7 @@ import com.fasterxml.jackson.annotation.JsonBackReference; import lombok.Getter; import lombok.Setter; - + /** * Domain class representing bid data storage * @@ -47,37 +47,44 @@ public class Db { private String name; private String host; + private Integer port; private String login; private String pass; private String property1; private String property2; - private String property3; + private String property3; @JsonBackReference - @ManyToMany(mappedBy = "dbs", cascade=CascadeType.ALL) + @ManyToMany(mappedBy = "dbs", cascade = CascadeType.ALL) /* - @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER) - @JoinTable( name = "map_db_topic", - joinColumns = { @JoinColumn(name="db_name") }, - inverseJoinColumns = { @JoinColumn(name="topic_name") } - ) */ - protected Set<Topic> topics; - + @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER) + @JoinTable( name = "map_db_topic", + joinColumns = { @JoinColumn(name="db_name") }, + inverseJoinColumns = { @JoinColumn(name="topic_name") } + ) */ + protected Set<Topic> topics; + public Db() { } public Db(String name) { this.name = name; } - + @Override - public boolean equals(Object obj) { - return name.equals(((Db)obj).getName()); + public boolean equals(Object obj) { + if (obj == null) + return false; + + if (this.getClass() != obj.getClass()) + return false; + + return name.equals(((Db) obj).getName()); } @Override public int hashCode() { - return name.hashCode(); + return name.hashCode(); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index e1da4d4d..20ebf94a 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -41,9 +41,9 @@ import com.fasterxml.jackson.annotation.JsonBackReference; import lombok.Getter; import lombok.Setter; - + /** - * Domain class representing topic + * Domain class representing topic * * @author Guobiao Mo * @@ -56,7 +56,7 @@ public class Topic { @Id private String name;//topic name - @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) + @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) @JoinColumn(name = "default_topic", nullable = true) private Topic defaultTopic; @@ -67,18 +67,18 @@ public class Topic { //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL) @JsonBackReference //@JsonManagedReference - @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER) + @ManyToMany(cascade = CascadeType.ALL, fetch = FetchType.EAGER) @JoinTable( name = "map_db_topic", joinColumns = { @JoinColumn(name="topic_name") }, inverseJoinColumns = { @JoinColumn(name="db_name") } ) protected Set<Db> dbs; - + /** - * indicate if we should monitor this topic + * indicate if we should monitor this topic */ private Boolean enabled; - + /** * save raw message text */ @@ -94,12 +94,12 @@ public class Topic { /** * TTL in day */ - private Integer ttl; - + private Integer ttl; + //if this flag is true, need to correlate alarm cleared message to previous alarm @Column(name = "correlate_cleared_message") private Boolean correlateClearedMessage; - + //the value in the JSON with this path will be used as DB id @Column(name = "message_id_path") private String messageIdPath; @@ -114,15 +114,15 @@ public class Topic { public boolean isDefault() { return "_DL_DEFAULT_".equals(name); } - + public boolean isEnabled() { - return is(enabled, Topic::isEnabled); + return is(enabled, Topic::isEnabled); } public boolean isCorrelateClearedMessage() { return is(correlateClearedMessage, Topic::isCorrelateClearedMessage); } - + public int getTtl() { if (ttl != null) { return ttl; @@ -130,9 +130,9 @@ public class Topic { return defaultTopic.getTtl(); } else { return 3650;//default to 10 years for safe - } + } } - + public DataFormat getDataFormat() { if (dataFormat != null) { return DataFormat.fromString(dataFormat); @@ -147,7 +147,7 @@ public class Topic { private boolean is(Boolean b, Predicate<Topic> pre) { return is(b, pre, false); } - + private boolean is(Boolean b, Predicate<Topic> pre, boolean defaultValue) { if (b != null) { return b; @@ -178,53 +178,59 @@ public class Topic { return containDb("MongoDB"); } - private boolean containDb(String dbName) { + private boolean containDb(String dbName) { Db db = new Db(dbName); - - if(dbs!=null && dbs.contains(db)) { + + if (dbs != null && dbs.contains(db)) { return true; } - + if (defaultTopic != null) { return defaultTopic.containDb(dbName); } else { return false; } } - + //extract DB id from JSON attributes, support multiple attributes public String getMessageId(JSONObject json) { String id = null; - - if(StringUtils.isNotBlank(messageIdPath)) { - String[] paths=messageIdPath.split(","); - - StringBuilder sb= new StringBuilder(); - for(int i=0; i<paths.length; i++) { - if(i>0) { - sb.append('^'); + + if (StringUtils.isNotBlank(messageIdPath)) { + String[] paths = messageIdPath.split(","); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < paths.length; i++) { + if (i > 0) { + sb.append('^'); } - sb.append(json.query(paths[i]).toString()); + sb.append(json.query(paths[i]).toString()); } id = sb.toString(); } - + return id; } - + @Override public String toString() { return name; } @Override - public boolean equals(Object obj) { - return name.equals(((Topic)obj).getName()); + public boolean equals(Object obj) { + if (obj == null) + return false; + + if (this.getClass() != obj.getClass()) + return false; + + return name.equals(((Topic) obj).getName()); } @Override public int hashCode() { - return name.hashCode(); + return name.hashCode(); } - + } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java index 1f637e1a..fea07187 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -29,9 +29,9 @@ import javax.annotation.PreDestroy; import org.apache.http.HttpHost; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; @@ -100,15 +100,14 @@ public class ElasticsearchService { public void ensureTableExist(String topic) throws IOException { String topicLower = topic.toLowerCase(); - GetIndexRequest request = new GetIndexRequest(); - request.indices(topicLower); + GetIndexRequest request = new GetIndexRequest(topicLower); boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); if(!exists){ CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged()); - } + } } //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java new file mode 100644 index 00000000..2b889215 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java @@ -0,0 +1,98 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2018 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.bson.Document; + +import org.json.JSONObject; + +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.mongodb.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; + +/** + * Service to use MongoDB + * + * @author Guobiao Mo + * + */ +@Service +public class MongodbService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private DbService dbService; + + private MongoDatabase database; + private MongoClient mongoClient; + private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>(); + + @PostConstruct + private void init() { + Db mongodb = dbService.getMongoDB(); + + mongoClient = new MongoClient(mongodb.getHost(), mongodb.getPort()); + database = mongoClient.getDatabase(mongodb.getProperty1()); + } + + @PreDestroy + public void cleanUp() { + mongoClient.close(); + } + + public void saveJsons(Topic topic, List<JSONObject> jsons) { + List<Document> documents = new ArrayList<>(jsons.size()); + for (JSONObject json : jsons) { + //convert org.json JSONObject to MongoDB Document + Document doc = Document.parse(json.toString()); + + String id = topic.getMessageId(json); //id can be null + if (id != null) { + doc.put("_id", id); + } + documents.add(doc); + } + + String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]","");//remove - _ . + MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k)); + collection.insertMany(documents); + + log.debug("saved text to topic = {}, topic total count = {} ", topic, collection.countDocuments()); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index 84e4fb7d..d9fe12a7 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -63,6 +63,9 @@ public class StoreService { private TopicService topicService; @Autowired + private MongodbService mongodbService; + + @Autowired private CouchbaseService couchbaseService; @Autowired @@ -152,6 +155,10 @@ public class StoreService { } private void saveJsons(Topic topic, List<JSONObject> jsons) { + if (topic.supportMongoDB()) { + mongodbService.saveJsons(topic, jsons); + } + if (topic.supportCouchbase()) { couchbaseService.saveJsons(topic, jsons); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index 4e10a365..cd5113cc 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -71,7 +71,8 @@ public class TopicService { public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException { Topic topic = getTopic(topicStr); if (topic == null) { - topic = getDefaultTopic(); + topic = new Topic(topicStr); + topic.setDefaultTopic(getDefaultTopic()); } if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) { |