summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-04-10 00:35:43 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-04-10 00:35:43 -0700
commit35cc15e04411008b2f8094bbd3876e7a2daed587 (patch)
tree7ba9c49f05f8596b71395c98e0dca1fa61f7d12e /components/datalake-handler/feeder/src
parent2672b1261e87bff3e6526534bc51b56b97d2e5ab (diff)
Support MongoDB as a data storage
Issue-ID: DCAEGEN2-1411 Change-Id: I06b69605e88d5b81500b788847e7c90ff4017a07 Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src')
-rw-r--r--components/datalake-handler/feeder/src/assembly/scripts/init_db.sql7
-rw-r--r--components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java60
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java6
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java35
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java78
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java11
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java98
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java7
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java3
9 files changed, 242 insertions, 63 deletions
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
index 2185320a..83db9f1f 100644
--- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
+++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
@@ -21,6 +21,7 @@ CREATE TABLE `topic` (
CREATE TABLE `db` (
`name` varchar(255) NOT NULL,
`host` varchar(255) DEFAULT NULL,
+ `port` int(11) DEFAULT NULL,
`login` varchar(255) DEFAULT NULL,
`pass` varchar(255) DEFAULT NULL,
`property1` varchar(255) DEFAULT NULL,
@@ -40,9 +41,9 @@ CREATE TABLE `map_db_topic` (
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dmaap','dmaap1234','dmaap');
+insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dl','dl1234','dl');
insert into db (name,host) values ('Elasticsearch','dl_es');
-insert into db (name,host) values ('MongoDB','dl_mongodb');
+insert into db (name,host,port,property1) values ('MongoDB','dl_mongodb',27017,'datalake');
insert into db (name,host) values ('Druid','dl_druid');
@@ -51,4 +52,4 @@ insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_D
insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);
-insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_');
diff --git a/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java b/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java
new file mode 100644
index 00000000..2bc6faad
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java
@@ -0,0 +1,60 @@
+package com.mongodb.internal.validator;
+
+//copy from https://github.com/mongodb/mongo-java-driver/blob/master/driver-core/src/main/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java
+//allow inserting name with dot
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2018 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+import org.bson.FieldNameValidator;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A field name validator for document that are meant for storage in MongoDB collections. It ensures that no fields contain a '.',
+ * or start with '$' (with the exception of "$db", "$ref", and "$id", so that DBRefs are not rejected).
+ *
+ * <p>This class should not be considered a part of the public API.</p>
+ */
+public class CollectibleDocumentFieldNameValidator implements FieldNameValidator {
+ // Have to support DBRef fields
+ private static final List<String> EXCEPTIONS = Arrays.asList("$db", "$ref", "$id");
+
+ @Override
+ public boolean validate(final String fieldName) {
+ if (fieldName == null) {
+ throw new IllegalArgumentException("Field name can not be null");
+ }
+
+ /* dl change
+ if (fieldName.contains(".")) {
+ return false;
+ }*/
+
+ if (fieldName.startsWith("$") && !EXCEPTIONS.contains(fieldName)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public FieldNameValidator getValidatorForField(final String fieldName) {
+ return this;
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
index bf9e417f..747a72c8 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -79,14 +79,14 @@ public class TopicController {
@GetMapping("/dmaap/")
@ResponseBody
- @ApiOperation(value="List all topics in DMaaP.")
+ @ApiOperation(value="List all topic names in DMaaP.")
public List<String> listDmaapTopics() throws IOException {
return dmaapService.getTopics();
}
@GetMapping("/")
@ResponseBody
- @ApiOperation(value="List all topics' details.")
+ @ApiOperation(value="List all topics' settings.")
public Iterable<Topic> list() throws IOException {
Iterable<Topic> ret = topicRepository.findAll();
return ret;
@@ -94,7 +94,7 @@ public class TopicController {
@GetMapping("/{topicName}")
@ResponseBody
- @ApiOperation(value="Get a topic's details.")
+ @ApiOperation(value="Get a topic's settings.")
public Topic getTopic(@PathVariable("topicName") String topicName) throws IOException {
Topic topic = topicService.getTopic(topicName);
return topic;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
index bbaedadc..306af490 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
@@ -31,7 +31,7 @@ import com.fasterxml.jackson.annotation.JsonBackReference;
import lombok.Getter;
import lombok.Setter;
-
+
/**
* Domain class representing bid data storage
*
@@ -47,37 +47,44 @@ public class Db {
private String name;
private String host;
+ private Integer port;
private String login;
private String pass;
private String property1;
private String property2;
- private String property3;
+ private String property3;
@JsonBackReference
- @ManyToMany(mappedBy = "dbs", cascade=CascadeType.ALL)
+ @ManyToMany(mappedBy = "dbs", cascade = CascadeType.ALL)
/*
- @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER)
- @JoinTable( name = "map_db_topic",
- joinColumns = { @JoinColumn(name="db_name") },
- inverseJoinColumns = { @JoinColumn(name="topic_name") }
- ) */
- protected Set<Topic> topics;
-
+ @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER)
+ @JoinTable( name = "map_db_topic",
+ joinColumns = { @JoinColumn(name="db_name") },
+ inverseJoinColumns = { @JoinColumn(name="topic_name") }
+ ) */
+ protected Set<Topic> topics;
+
public Db() {
}
public Db(String name) {
this.name = name;
}
-
+
@Override
- public boolean equals(Object obj) {
- return name.equals(((Db)obj).getName());
+ public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (this.getClass() != obj.getClass())
+ return false;
+
+ return name.equals(((Db) obj).getName());
}
@Override
public int hashCode() {
- return name.hashCode();
+ return name.hashCode();
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index e1da4d4d..20ebf94a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -41,9 +41,9 @@ import com.fasterxml.jackson.annotation.JsonBackReference;
import lombok.Getter;
import lombok.Setter;
-
+
/**
- * Domain class representing topic
+ * Domain class representing topic
*
* @author Guobiao Mo
*
@@ -56,7 +56,7 @@ public class Topic {
@Id
private String name;//topic name
- @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
+ @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
@JoinColumn(name = "default_topic", nullable = true)
private Topic defaultTopic;
@@ -67,18 +67,18 @@ public class Topic {
//@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
@JsonBackReference
//@JsonManagedReference
- @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER)
+ @ManyToMany(cascade = CascadeType.ALL, fetch = FetchType.EAGER)
@JoinTable( name = "map_db_topic",
joinColumns = { @JoinColumn(name="topic_name") },
inverseJoinColumns = { @JoinColumn(name="db_name") }
)
protected Set<Db> dbs;
-
+
/**
- * indicate if we should monitor this topic
+ * indicate if we should monitor this topic
*/
private Boolean enabled;
-
+
/**
* save raw message text
*/
@@ -94,12 +94,12 @@ public class Topic {
/**
* TTL in day
*/
- private Integer ttl;
-
+ private Integer ttl;
+
//if this flag is true, need to correlate alarm cleared message to previous alarm
@Column(name = "correlate_cleared_message")
private Boolean correlateClearedMessage;
-
+
//the value in the JSON with this path will be used as DB id
@Column(name = "message_id_path")
private String messageIdPath;
@@ -114,15 +114,15 @@ public class Topic {
public boolean isDefault() {
return "_DL_DEFAULT_".equals(name);
}
-
+
public boolean isEnabled() {
- return is(enabled, Topic::isEnabled);
+ return is(enabled, Topic::isEnabled);
}
public boolean isCorrelateClearedMessage() {
return is(correlateClearedMessage, Topic::isCorrelateClearedMessage);
}
-
+
public int getTtl() {
if (ttl != null) {
return ttl;
@@ -130,9 +130,9 @@ public class Topic {
return defaultTopic.getTtl();
} else {
return 3650;//default to 10 years for safe
- }
+ }
}
-
+
public DataFormat getDataFormat() {
if (dataFormat != null) {
return DataFormat.fromString(dataFormat);
@@ -147,7 +147,7 @@ public class Topic {
private boolean is(Boolean b, Predicate<Topic> pre) {
return is(b, pre, false);
}
-
+
private boolean is(Boolean b, Predicate<Topic> pre, boolean defaultValue) {
if (b != null) {
return b;
@@ -178,53 +178,59 @@ public class Topic {
return containDb("MongoDB");
}
- private boolean containDb(String dbName) {
+ private boolean containDb(String dbName) {
Db db = new Db(dbName);
-
- if(dbs!=null && dbs.contains(db)) {
+
+ if (dbs != null && dbs.contains(db)) {
return true;
}
-
+
if (defaultTopic != null) {
return defaultTopic.containDb(dbName);
} else {
return false;
}
}
-
+
//extract DB id from JSON attributes, support multiple attributes
public String getMessageId(JSONObject json) {
String id = null;
-
- if(StringUtils.isNotBlank(messageIdPath)) {
- String[] paths=messageIdPath.split(",");
-
- StringBuilder sb= new StringBuilder();
- for(int i=0; i<paths.length; i++) {
- if(i>0) {
- sb.append('^');
+
+ if (StringUtils.isNotBlank(messageIdPath)) {
+ String[] paths = messageIdPath.split(",");
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < paths.length; i++) {
+ if (i > 0) {
+ sb.append('^');
}
- sb.append(json.query(paths[i]).toString());
+ sb.append(json.query(paths[i]).toString());
}
id = sb.toString();
}
-
+
return id;
}
-
+
@Override
public String toString() {
return name;
}
@Override
- public boolean equals(Object obj) {
- return name.equals(((Topic)obj).getName());
+ public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (this.getClass() != obj.getClass())
+ return false;
+
+ return name.equals(((Topic) obj).getName());
}
@Override
public int hashCode() {
- return name.hashCode();
+ return name.hashCode();
}
-
+
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index 1f637e1a..fea07187 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -29,9 +29,9 @@ import javax.annotation.PreDestroy;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
@@ -100,15 +100,14 @@ public class ElasticsearchService {
public void ensureTableExist(String topic) throws IOException {
String topicLower = topic.toLowerCase();
- GetIndexRequest request = new GetIndexRequest();
- request.indices(topicLower);
+ GetIndexRequest request = new GetIndexRequest(topicLower);
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
if(!exists){
CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
- }
+ }
}
//TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
new file mode 100644
index 00000000..2b889215
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
@@ -0,0 +1,98 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2018 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.bson.Document;
+
+import org.json.JSONObject;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+/**
+ * Service to use MongoDB
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class MongodbService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private DbService dbService;
+
+ private MongoDatabase database;
+ private MongoClient mongoClient;
+ private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
+
+ @PostConstruct
+ private void init() {
+ Db mongodb = dbService.getMongoDB();
+
+ mongoClient = new MongoClient(mongodb.getHost(), mongodb.getPort());
+ database = mongoClient.getDatabase(mongodb.getProperty1());
+ }
+
+ @PreDestroy
+ public void cleanUp() {
+ mongoClient.close();
+ }
+
+ public void saveJsons(Topic topic, List<JSONObject> jsons) {
+ List<Document> documents = new ArrayList<>(jsons.size());
+ for (JSONObject json : jsons) {
+ //convert org.json JSONObject to MongoDB Document
+ Document doc = Document.parse(json.toString());
+
+ String id = topic.getMessageId(json); //id can be null
+ if (id != null) {
+ doc.put("_id", id);
+ }
+ documents.add(doc);
+ }
+
+ String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]","");//remove - _ .
+ MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k));
+ collection.insertMany(documents);
+
+ log.debug("saved text to topic = {}, topic total count = {} ", topic, collection.countDocuments());
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index 84e4fb7d..d9fe12a7 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -63,6 +63,9 @@ public class StoreService {
private TopicService topicService;
@Autowired
+ private MongodbService mongodbService;
+
+ @Autowired
private CouchbaseService couchbaseService;
@Autowired
@@ -152,6 +155,10 @@ public class StoreService {
}
private void saveJsons(Topic topic, List<JSONObject> jsons) {
+ if (topic.supportMongoDB()) {
+ mongodbService.saveJsons(topic, jsons);
+ }
+
if (topic.supportCouchbase()) {
couchbaseService.saveJsons(topic, jsons);
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index 4e10a365..cd5113cc 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -71,7 +71,8 @@ public class TopicService {
public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
Topic topic = getTopic(topicStr);
if (topic == null) {
- topic = getDefaultTopic();
+ topic = new Topic(topicStr);
+ topic.setDefaultTopic(getDefaultTopic());
}
if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) {