summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-06-27 18:42:59 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-06-27 18:46:11 -0700
commitb14c5766902d486a94a8db96d2a31ff0e9e8255e (patch)
tree421f9bd6ac50f36d5f128bfab7fd3653b9ff8894 /components/datalake-handler/feeder/src/main/java
parentb3f5051484f5b973a47a60fb8f76a67ca5ff88da (diff)
supports multiple Kafka clusters and DBs
Read data from Kafka and store into DBs Issue-ID: DCAEGEN2-1631 Change-Id: Ic2736b6e0497ac2084b1a7ce0da3a6e0e1379f43 Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java16
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java54
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java30
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java6
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java64
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java17
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java127
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java1
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java75
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java10
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java38
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java35
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java5
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java28
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java34
-rwxr-xr-xcomponents/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PortalDesignService.java109
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java14
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java25
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java97
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java108
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java103
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java)38
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java37
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java)39
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java)49
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java)28
27 files changed, 748 insertions, 443 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index e371af1b..806dc72e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -54,6 +54,8 @@ public class ApplicationConfiguration {
private String defaultTopicName;
+ private int checkTopicInterval; //in millisecond
+/*
//DMaaP
private String dmaapZookeeperHostPort;
private String dmaapKafkaHostPort;
@@ -68,7 +70,7 @@ public class ApplicationConfiguration {
private int dmaapCheckNewTopicInterval; //in millisecond
private int kafkaConsumerCount;
-
+*/
private String elasticsearchType;
//HDFS
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
index bd9b742b..322be412 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
@@ -27,8 +27,6 @@ import javax.servlet.http.HttpServletResponse;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.DbRepository;
-import org.onap.datalake.feeder.repository.TopicRepository;
-import org.onap.datalake.feeder.service.DbService;
import org.onap.datalake.feeder.dto.DbConfig;
import org.onap.datalake.feeder.controller.domain.PostReturnBody;
import org.slf4j.Logger;
@@ -59,12 +57,6 @@ public class DbController {
@Autowired
private DbRepository dbRepository;
- @Autowired
- private TopicRepository topicRepository;
-
- @Autowired
- private DbService dbService;
-
//list all dbs
@GetMapping("")
@ResponseBody
@@ -92,11 +84,11 @@ public class DbController {
return null;
}
- Db oldDb = dbService.getDb(dbConfig.getName());
+/* Db oldDb = dbService.getDb(dbConfig.getName());
if (oldDb != null) {
sendError(response, 400, "Db already exists: " + dbConfig.getName());
return null;
- } else {
+ } else {*/
Db newdb = new Db();
newdb.setName(dbConfig.getName());
newdb.setHost(dbConfig.getHost());
@@ -118,7 +110,7 @@ public class DbController {
retBody.setReturnBody(retMsg);
retBody.setStatusCode(200);
return retBody;
- }
+ //}
}
//Show a db
@@ -191,7 +183,7 @@ public class DbController {
return null;
}
- Db oldDb = dbService.getDb(dbConfig.getName());
+ Db oldDb = dbRepository.findById(dbConfig.getId()).get();
if (oldDb == null) {
sendError(response, 404, "Db not found: " + dbConfig.getName());
return null;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
index 93cec8bb..1162aedd 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -27,17 +27,18 @@ import java.util.Set;
import javax.servlet.http.HttpServletResponse;
import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Kafka;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.controller.domain.PostReturnBody;
import org.onap.datalake.feeder.dto.TopicConfig;
-import org.onap.datalake.feeder.repository.DbRepository;
+import org.onap.datalake.feeder.repository.KafkaRepository;
import org.onap.datalake.feeder.repository.TopicRepository;
-import org.onap.datalake.feeder.service.DbService;
import org.onap.datalake.feeder.service.DmaapService;
import org.onap.datalake.feeder.service.TopicService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
import org.springframework.http.MediaType;
import org.springframework.validation.BindingResult;
import org.springframework.web.bind.annotation.DeleteMapping;
@@ -71,19 +72,27 @@ public class TopicController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
+ //@Autowired
+ //private DmaapService dmaapService;
+
@Autowired
- private DmaapService dmaapService;
+ private ApplicationContext context;
@Autowired
+ private KafkaRepository kafkaRepository;
+
+ @Autowired
private TopicRepository topicRepository;
@Autowired
private TopicService topicService;
- @GetMapping("/dmaap")
+ @GetMapping("/dmaap/{kafkaId}")
@ResponseBody
@ApiOperation(value = "List all topic names in DMaaP.")
- public List<String> listDmaapTopics() {
+ public List<String> listDmaapTopics(@PathVariable("kafkaId") String kafkaId ) {
+ Kafka kafka = kafkaRepository.findById(kafkaId).get();
+ DmaapService dmaapService = context.getBean(DmaapService.class, kafka);
return dmaapService.getTopics();
}
@@ -95,7 +104,7 @@ public class TopicController {
List<String> retString = new ArrayList<>();
for(Topic item : ret)
{
- if(!topicService.istDefaultTopic(item))
+ if(!topicService.isDefaultTopic(item))
retString.add(item.getName());
}
return retString;
@@ -110,24 +119,25 @@ public class TopicController {
sendError(response, 400, "Error parsing Topic: "+result.toString());
return null;
}
- Topic oldTopic = topicService.getTopic(topicConfig.getName());
+ /*Topic oldTopic = topicService.getTopic(topicConfig.getName());
if (oldTopic != null) {
sendError(response, 400, "Topic already exists "+topicConfig.getName());
return null;
- } else {
+ } else {*/
Topic wTopic = topicService.fillTopicConfiguration(topicConfig);
if(wTopic.getTtl() == 0)
wTopic.setTtl(3650);
topicRepository.save(wTopic);
return mkPostReturnBody(200, wTopic);
- }
+ //}
+ //FIXME need to connect to Kafka
}
- @GetMapping("/{topicName}")
+ @GetMapping("/{topicId}")
@ResponseBody
@ApiOperation(value="Get a topic's settings.")
- public TopicConfig getTopic(@PathVariable("topicName") String topicName, HttpServletResponse response) throws IOException {
- Topic topic = topicService.getTopic(topicName);
+ public TopicConfig getTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException {
+ Topic topic = topicService.getTopic(topicId);
if(topic == null) {
sendError(response, 404, "Topic not found");
return null;
@@ -137,23 +147,23 @@ public class TopicController {
//This is not a partial update: old topic is wiped out, and new topic is created based on the input json.
//One exception is that old DBs are kept
- @PutMapping("/{topicName}")
+ @PutMapping("/{topicId}")
@ResponseBody
@ApiOperation(value="Update a topic.")
- public PostReturnBody<TopicConfig> updateTopic(@PathVariable("topicName") String topicName, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException {
+ public PostReturnBody<TopicConfig> updateTopic(@PathVariable("topicId") int topicId, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException {
if (result.hasErrors()) {
sendError(response, 400, "Error parsing Topic: "+result.toString());
return null;
}
- if(!topicName.equals(topicConfig.getName()))
+ if(topicId!=topicConfig.getId())
{
- sendError(response, 400, "Topic name mismatch" + topicName + topicConfig.getName());
+ sendError(response, 400, "Topic name mismatch" + topicId + topicConfig);
return null;
}
- Topic oldTopic = topicService.getTopic(topicConfig.getName());
+ Topic oldTopic = topicService.getTopic(topicId);
if (oldTopic == null) {
sendError(response, 404, "Topic not found "+topicConfig.getName());
return null;
@@ -164,14 +174,14 @@ public class TopicController {
}
}
- @DeleteMapping("/{topicName}")
+ @DeleteMapping("/{topicId}")
@ResponseBody
- @ApiOperation(value="Update a topic.")
- public void deleteTopic(@PathVariable("topicName") String topicName, HttpServletResponse response) throws IOException
+ @ApiOperation(value="Delete a topic.")
+ public void deleteTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException
{
- Topic oldTopic = topicService.getTopic(topicName);
+ Topic oldTopic = topicService.getTopic(topicId);
if (oldTopic == null) {
- sendError(response, 404, "Topic not found "+topicName);
+ sendError(response, 404, "Topic not found "+topicId);
} else {
Set<Db> dbRelation = oldTopic.getDbs();
dbRelation.clear();
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
index d84b34f8..7059cd09 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
@@ -32,6 +32,9 @@ import javax.persistence.JoinTable;
import javax.persistence.ManyToMany;
import javax.persistence.ManyToOne;
import javax.persistence.Table;
+
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+
import com.fasterxml.jackson.annotation.JsonBackReference;
import lombok.Getter;
import lombok.Setter;
@@ -51,12 +54,12 @@ public class Db {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "`id`")
- private Integer id;
+ private int id;
@Column(name="`name`")
private String name;
- @Column(name="`enabled`")
+ @Column(name="`enabled`", nullable = false)
private boolean enabled;
@Column(name="`host`")
@@ -98,13 +101,30 @@ public class Db {
)
private Set<Topic> topics;
- public Db() {
+ public boolean isHdfs() {
+ return isDb(DbTypeEnum.HDFS);
+ }
+
+ public boolean isElasticsearch() {
+ return isDb(DbTypeEnum.ES);
+ }
+
+ public boolean isCouchbase() {
+ return isDb(DbTypeEnum.CB);
+ }
+
+ public boolean isDruid() {
+ return isDb(DbTypeEnum.DRUID);
}
- public Db(String name) {
- this.name = name;
+ public boolean isMongoDB() {
+ return isDb(DbTypeEnum.MONGO);
}
+ private boolean isDb(DbTypeEnum dbTypeEnum) {
+ return dbTypeEnum.equals(DbTypeEnum.valueOf(dbType.getId()));
+ }
+
@Override
public String toString() {
return String.format("Db %s (name=%, enabled=%s)", id, name, enabled);
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java
index 0a88b155..9c83a9cd 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java
@@ -48,14 +48,14 @@ public class DbType {
@Column(name="`id`")
private String id;
- @Column(name="`name`")
+ @Column(name="`name`", nullable = false)
private String name;
@Column(name="`default_port`")
private Integer defaultPort;
- @Column(name="`tool`")
- private Boolean tool;
+ @Column(name="`tool`", nullable = false)
+ private boolean tool;
@OneToMany(cascade = CascadeType.ALL, fetch = FetchType.LAZY, mappedBy = "dbType")
protected Set<Db> dbs = new HashSet<>();
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java
new file mode 100644
index 00000000..df7aad04
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java
@@ -0,0 +1,64 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+/**
+ * A warper of parent Topic
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+public class EffectiveTopic {
+ private Topic topic; //base Topic
+
+ String name;
+
+ public EffectiveTopic(Topic baseTopic) {
+ topic = baseTopic;
+ }
+
+ public EffectiveTopic(Topic baseTopic, String name ) {
+ topic = baseTopic;
+ this.name = name;
+ }
+
+ public String getName() {
+ return name==null?topic.getName():name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Topic getTopic() {
+ return topic;
+ }
+
+ public void setTopic(Topic topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("EffectiveTopic %s (base Topic=%s)", getName(), topic.toString());
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
index e3347a4a..d2189cbc 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
@@ -49,23 +49,23 @@ public class Kafka {
@Column(name="`id`")
private String id;
- @Column(name="`name`")
+ @Column(name="`name`", nullable = false)
private String name;
- @Column(name="`enabled`")
+ @Column(name="`enabled`", nullable = false)
private boolean enabled;
- @Column(name="broker_list")
+ @Column(name="broker_list", nullable = false)
private String brokerList;//message-router-kafka:9092,message-router-kafka2:9092
- @Column(name="`zk`")
+ @Column(name="`zk`", nullable = false)
private String zooKeeper;//message-router-zookeeper:2181
@Column(name="`group`", columnDefinition = "varchar(255) DEFAULT 'datalake'")
private String group;
@Column(name="`secure`", columnDefinition = " bit(1) DEFAULT 0")
- private Boolean secure;
+ private boolean secure;
@Column(name="`login`")
private String login;
@@ -81,8 +81,7 @@ public class Kafka {
@Column(name="`included_topic`")
private String includedTopic;
- //@Column(name="`excluded_topic`", columnDefinition = "varchar(1023) default '__consumer_offsets,__transaction_state'")
- @Column(name="`excluded_topic`")
+ @Column(name="`excluded_topic`", columnDefinition = "varchar(1023) default '__consumer_offsets,__transaction_state'")
private String excludedTopic;
@Column(name="`consumer_count`", columnDefinition = "integer default 3")
@@ -93,8 +92,8 @@ public class Kafka {
private Integer timeout;
//don't show this field in admin UI
- @Column(name="`check_topic_interval_sec`", columnDefinition = "integer default 10")
- private Integer checkTopicInterval;
+ //@Column(name="`check_topic_interval_sec`", columnDefinition = "integer default 10")
+// private Integer checkTopicInterval;
@JsonBackReference
@ManyToMany(fetch = FetchType.EAGER)
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index cb07e140..a27b6756 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -20,6 +20,7 @@
package org.onap.datalake.feeder.domain;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -33,7 +34,16 @@ import javax.persistence.ManyToMany;
import javax.persistence.ManyToOne;
import javax.persistence.Table;
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONObject;
import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
import com.fasterxml.jackson.annotation.JsonBackReference;
@@ -71,30 +81,30 @@ public class Topic {
//@JsonManagedReference
@ManyToMany(fetch = FetchType.EAGER)
@JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "db_id") })
- protected Set<Db> dbs;
+ protected Set<Db> dbs=new HashSet<>();
@ManyToMany(fetch = FetchType.EAGER)
@JoinTable(name = "map_kafka_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "kafka_id") })
- protected Set<Kafka> kafkas;
+ protected Set<Kafka> kafkas=new HashSet<>();
/**
* indicate if we should monitor this topic
*/
- @Column(name = "`enabled`")
- private Boolean enabled;
+ @Column(name = "`enabled`", nullable = false)
+ private boolean enabled;
/**
* save raw message text
*/
- @Column(name = "`save_raw`")
- private Boolean saveRaw;
+ @Column(name = "`save_raw`", nullable = false, columnDefinition = " bit(1) DEFAULT 0")
+ private boolean saveRaw;
/**
* need to explicitly tell feeder the data format of the message. support JSON,
* XML, YAML, TEXT
*/
@Column(name = "`data_format`")
- private String dataFormat;
+ protected String dataFormat;
/**
* TTL in day
@@ -103,41 +113,33 @@ public class Topic {
private Integer ttl;
//if this flag is true, need to correlate alarm cleared message to previous alarm
- @Column(name = "`correlate_cleared_message`")
- private Boolean correlateClearedMessage;
+ @Column(name = "`correlate_cleared_message`", nullable = false, columnDefinition = " bit(1) DEFAULT 0")
+ private boolean correlateClearedMessage;
//paths to the values in the JSON that are used to composite DB id, comma separated, example: "/event-header/id,/event-header/entity-type,/entity/product-name"
@Column(name = "`message_id_path`")
- private String messageIdPath;
+ protected String messageIdPath;
//paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray"
- @Column(name = "`aggregate_array_path`")
- private String aggregateArrayPath;
+ @Column(name = "`aggregate_array_path`")
+ protected String aggregateArrayPath;
//paths to the element in array that need flatten, this element is used as label, comma separated,
//example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..."
- @Column(name = "`flatten_array_path`")
- private String flattenArrayPath;
+ @Column(name = "`flatten_array_path`")
+ protected String flattenArrayPath;
public Topic() {
}
-
+/*
public Topic(String name) {//TODO
//this.name = name;
}
-
+*/
public String getName() {
return topicName.getId();
}
- public boolean isEnabled() {
- return is(enabled);
- }
-
- public boolean isCorrelateClearedMessage() {
- return is(correlateClearedMessage);
- }
-
public int getTtl() {
if (ttl != null) {
return ttl;
@@ -145,27 +147,86 @@ public class Topic {
return 3650;//default to 10 years for safe
}
}
+/*
+ public boolean supportHdfs() {
+ return supportDb(DbTypeEnum.HDFS);
+ }
+
+ public boolean supportElasticsearch() {
+ return supportDb(DbTypeEnum.ES);
+ }
+
+ public boolean supportCouchbase() {
+ return supportDb(DbTypeEnum.CB);
+ }
- private boolean is(Boolean b) {
- return is(b, false);
+ public boolean supportDruid() {
+ return supportDb(DbTypeEnum.DRUID);
}
- private boolean is(Boolean b, boolean defaultValue) {
- if (b != null) {
- return b;
+ public boolean supportMongoDB() {
+ return supportDb(DbTypeEnum.MONGO);
+ }
+
+ private boolean supportDb(DbTypeEnum dbTypeEnum) {
+ for(Db db : dbs) {
+
+ }
+ }
+*/
+ public DataFormat getDataFormat2() {
+ if (dataFormat != null) {
+ return DataFormat.fromString(dataFormat);
} else {
- return defaultValue;
+ return null;
+ }
+ }
+
+ public String[] getAggregateArrayPath2() {
+ String[] ret = null;
+
+ if (StringUtils.isNotBlank(aggregateArrayPath)) {
+ ret = aggregateArrayPath.split(",");
+ }
+
+ return ret;
+ }
+
+ public String[] getFlattenArrayPath2() {
+ String[] ret = null;
+
+ if (StringUtils.isNotBlank(flattenArrayPath)) {
+ ret = flattenArrayPath.split(",");
}
+
+ return ret;
}
- public boolean isSaveRaw() {
- return is(saveRaw);
+ //extract DB id from JSON attributes, support multiple attributes
+ public String getMessageId(JSONObject json) {
+ String id = null;
+
+ if (StringUtils.isNotBlank(messageIdPath)) {
+ String[] paths = messageIdPath.split(",");
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < paths.length; i++) {
+ if (i > 0) {
+ sb.append('^');
+ }
+ sb.append(json.query(paths[i]).toString());
+ }
+ id = sb.toString();
+ }
+
+ return id;
}
public TopicConfig getTopicConfig() {
TopicConfig tConfig = new TopicConfig();
- //tConfig.setName(getName());
+ tConfig.setId(getId());
+ tConfig.setName(getName());
tConfig.setLogin(getLogin());
tConfig.setEnabled(isEnabled());
tConfig.setDataFormat(dataFormat);
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java
index 0b6c54c3..eff87114 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java
@@ -33,6 +33,7 @@ import lombok.Setter;
@Getter
@Setter
public class DbConfig {
+ private int id;
private String name;
private String host;
private boolean enabled;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index 1fffa7ec..ace7bfa9 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -41,6 +41,7 @@ import org.onap.datalake.feeder.enumeration.DataFormat;
public class TopicConfig {
+ private int id;
private String name;
private String login;
private String password;
@@ -54,79 +55,7 @@ public class TopicConfig {
private String messageIdPath;
private String aggregateArrayPath;
private String flattenArrayPath;
-
- public DataFormat getDataFormat2() {
- if (dataFormat != null) {
- return DataFormat.fromString(dataFormat);
- } else {
- return null;
- }
- }
-
- public boolean supportHdfs() {
- return supportDb("HDFS");
- }
-
- public boolean supportElasticsearch() {
- return supportDb("Elasticsearch");//TODO string hard codes
- }
-
- public boolean supportCouchbase() {
- return supportDb("Couchbase");
- }
-
- public boolean supportDruid() {
- return supportDb("Druid");
- }
-
- public boolean supportMongoDB() {
- return supportDb("MongoDB");
- }
-
- private boolean supportDb(String dbName) {
- return (enabledSinkdbs != null && enabledSinkdbs.contains(dbName));
- }
-
- //extract DB id from JSON attributes, support multiple attributes
- public String getMessageId(JSONObject json) {
- String id = null;
-
- if (StringUtils.isNotBlank(messageIdPath)) {
- String[] paths = messageIdPath.split(",");
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < paths.length; i++) {
- if (i > 0) {
- sb.append('^');
- }
- sb.append(json.query(paths[i]).toString());
- }
- id = sb.toString();
- }
-
- return id;
- }
-
- public String[] getAggregateArrayPath2() {
- String[] ret = null;
-
- if (StringUtils.isNotBlank(aggregateArrayPath)) {
- ret = aggregateArrayPath.split(",");
- }
-
- return ret;
- }
-
- public String[] getFlattenArrayPath2() {
- String[] ret = null;
-
- if (StringUtils.isNotBlank(flattenArrayPath)) {
- ret = flattenArrayPath.split(",");
- }
-
- return ret;
- }
-
+
@Override
public String toString() {
return String.format("TopicConfig %s(enabled=%s, enabledSinkdbs=%s)", name, enabled, enabledSinkdbs);
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java
index 9b1eb23b..05d76d55 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java
@@ -26,7 +26,7 @@ package org.onap.datalake.feeder.enumeration;
*
*/
public enum DbTypeEnum {
- CB("Couchbase"), DRUID("Druid"), ES("Elasticsearch"), HDFS("HDFS"), MONGO("MongoDB"), KIBANA("Kibana");
+ CB("Couchbase"), DRUID("Druid"), ES("Elasticsearch"), HDFS("HDFS"), MONGO("MongoDB"), KIBANA("Kibana"), SUPERSET("Superset");
private final String name;
@@ -34,12 +34,4 @@ public enum DbTypeEnum {
this.name = name;
}
- public static DbTypeEnum fromString(String s) {
- for (DbTypeEnum df : DbTypeEnum.values()) {
- if (df.name.equalsIgnoreCase(s)) {
- return df;
- }
- }
- throw new IllegalArgumentException("Invalid value for db: " + s);
- }
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java
new file mode 100644
index 00000000..157fbf94
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java
@@ -0,0 +1,38 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2018 TechMahindra
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.enumeration;
+
+/**
+ * Design type
+ *
+ * @author Guobiao Mo
+ *
+ */
+public enum DesignTypeEnum {
+ KIBANA_DB("Kibana Dashboard"), KIBANA_SEARCH("Kibana Search"), KIBANA_VISUAL("Kibana Visualization"),
+ ES_MAPPING("Elasticsearch Field Mapping Template"), DRUID_KAFKA_SPEC("Druid Kafka Indexing Service Supervisor Spec");
+
+ private final String name;
+
+ DesignTypeEnum(String name) {
+ this.name = name;
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java
new file mode 100644
index 00000000..9f8ea8a9
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java
@@ -0,0 +1,35 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.repository;
+
+import org.onap.datalake.feeder.domain.TopicName;
+import org.springframework.data.repository.CrudRepository;
+
+/**
+ *
+ * TopicName Repository
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+public interface TopicNameRepository extends CrudRepository<TopicName, String> {
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
index 182bf6f1..b4dd6374 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
@@ -19,6 +19,9 @@
*/
package org.onap.datalake.feeder.repository;
+import java.util.List;
+
+import org.onap.datalake.feeder.domain.Portal;
import org.onap.datalake.feeder.domain.Topic;
import org.springframework.data.repository.CrudRepository;
@@ -32,5 +35,5 @@ import org.springframework.data.repository.CrudRepository;
*/
public interface TopicRepository extends CrudRepository<Topic, Integer> {
-
+ //List<Topic> findByTopicName(String topicStr);
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
index 6d6fb750..2e934e2e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
@@ -20,9 +20,6 @@
package org.onap.datalake.feeder.service;
-import java.util.Optional;
-
-import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.repository.DbRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -38,29 +35,4 @@ public class DbService {
@Autowired
private DbRepository dbRepository;
-
- public Db getDb(String name) {
- return dbRepository.findByName(name);
- }
-
- public Db getCouchbase() {
- return getDb("Couchbase");
- }
-
- public Db getElasticsearch() {
- return getDb("Elasticsearch");
- }
-
- public Db getMongoDB() {
- return getDb("MongoDB");
- }
-
- public Db getDruid() {
- return getDb("Druid");
- }
-
- public Db getHdfs() {
- return getDb("HDFS");
- }
-
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
index 5c544d6c..1bfd437f 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
@@ -24,7 +24,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import javax.annotation.PostConstruct;
@@ -35,6 +37,8 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Kafka;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +64,12 @@ public class DmaapService {
private ZooKeeper zk;
+ private Kafka kafka;
+
+ public DmaapService(Kafka kafka) {
+ this.kafka = kafka;
+ }
+
@PreDestroy
public void cleanUp() throws InterruptedException {
config.getShutdownLock().readLock().lock();
@@ -76,7 +86,7 @@ public class DmaapService {
@PostConstruct
private void init() throws IOException, InterruptedException {
- zk = connect(config.getDmaapZookeeperHostPort());
+ zk = connect(kafka.getZooKeeper());
}
//get all topic names from Zookeeper
@@ -84,11 +94,11 @@ public class DmaapService {
public List<String> getTopics() {
try {
if (zk == null) {
- zk = connect(config.getDmaapZookeeperHostPort());
+ zk = connect(kafka.getZooKeeper());
}
- log.info("connecting to ZooKeeper {} for a list of topics.", config.getDmaapZookeeperHostPort());
+ log.info("connecting to ZooKeeper {} for a list of topics.", kafka.getZooKeeper());
List<String> topics = zk.getChildren("/brokers/topics", false);
- String[] excludes = config.getDmaapKafkaExclude();
+ String[] excludes = kafka.getExcludedTopic().split(",");
topics.removeAll(Arrays.asList(excludes));
log.info("list of topics: {}", topics);
return topics;
@@ -100,7 +110,7 @@ public class DmaapService {
}
private ZooKeeper connect(String host) throws IOException, InterruptedException {
- log.info("connecting to ZooKeeper {} ...", config.getDmaapZookeeperHostPort());
+ log.info("connecting to ZooKeeper {} ...", kafka.getZooKeeper());
CountDownLatch connectedSignal = new CountDownLatch(1);
ZooKeeper ret = new ZooKeeper(host, 10000, new Watcher() {
public void process(WatchedEvent we) {
@@ -126,18 +136,18 @@ public class DmaapService {
return ret;
}
*/
- public List<TopicConfig> getActiveTopicConfigs() throws IOException {
+ public Map<String, List<EffectiveTopic>> getActiveEffectiveTopic() throws IOException {
log.debug("entering getActiveTopicConfigs()...");
- List<String> allTopics = getTopics();
+ List<String> allTopics = getTopics(); //topics in Kafka cluster TODO update table topic_name with new topics
- List<TopicConfig> ret = new ArrayList<>(allTopics.size());
+ Map<String, List<EffectiveTopic>> ret = new HashMap<>();
for (String topicStr : allTopics) {
log.debug("get topic setting from DB: {}.", topicStr);
- TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true);
- if (topicConfig.isEnabled()) {
- ret.add(topicConfig);
- }
+ List<EffectiveTopic> effectiveTopics= topicService.getEnabledEffectiveTopic(kafka, topicStr, true);
+
+ ret.put(topicStr , effectiveTopics);
+
}
return ret;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PortalDesignService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PortalDesignService.java
index df701e88..408e4971 100755
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PortalDesignService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PortalDesignService.java
@@ -23,15 +23,27 @@ package org.onap.datalake.feeder.service;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.DbType;
import org.onap.datalake.feeder.domain.DesignType;
import org.onap.datalake.feeder.domain.Portal;
import org.onap.datalake.feeder.domain.PortalDesign;
import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.domain.TopicName;
import org.onap.datalake.feeder.dto.PortalDesignConfig;
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+import org.onap.datalake.feeder.enumeration.DesignTypeEnum;
import org.onap.datalake.feeder.repository.DesignTypeRepository;
import org.onap.datalake.feeder.repository.PortalDesignRepository;
+import org.onap.datalake.feeder.repository.TopicNameRepository;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
import org.onap.datalake.feeder.util.HttpClientUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,11 +63,11 @@ public class PortalDesignService {
static String POST_FLAG;
- @Autowired
- private PortalDesignRepository portalDesignRepository;
+ @Autowired
+ private PortalDesignRepository portalDesignRepository;
- @Autowired
- private TopicService topicService;
+ @Autowired
+ private TopicNameRepository topicNameRepository;
@Autowired
private DesignTypeRepository designTypeRepository;
@@ -63,17 +75,13 @@ public class PortalDesignService {
@Autowired
private ApplicationConfiguration applicationConfiguration;
- @Autowired
- private DbService dbService;
-
- public PortalDesign fillPortalDesignConfiguration(PortalDesignConfig portalDesignConfig) throws Exception
- {
+ public PortalDesign fillPortalDesignConfiguration(PortalDesignConfig portalDesignConfig) throws Exception {
PortalDesign portalDesign = new PortalDesign();
fillPortalDesign(portalDesignConfig, portalDesign);
return portalDesign;
}
- public void fillPortalDesignConfiguration(PortalDesignConfig portalDesignConfig, PortalDesign portalDesign) throws Exception
- {
+
+ public void fillPortalDesignConfiguration(PortalDesignConfig portalDesignConfig, PortalDesign portalDesign) throws Exception {
fillPortalDesign(portalDesignConfig, portalDesign);
}
@@ -86,32 +94,34 @@ public class PortalDesignService {
portalDesign.setSubmitted(portalDesignConfig.getSubmitted());
if (portalDesignConfig.getTopic() != null) {
- Topic topic = topicService.getTopic(portalDesignConfig.getTopic());
- if (topic == null) throw new IllegalArgumentException("topic is null");
- portalDesign.setTopicName(topic.getTopicName());
- }else {
- throw new IllegalArgumentException("Can not find topic in DB, topic name: "+portalDesignConfig.getTopic());
+ Optional<TopicName> topicName = topicNameRepository.findById(portalDesignConfig.getTopic());
+ if (topicName.isPresent()) {
+ portalDesign.setTopicName(topicName.get());
+ } else {
+ throw new IllegalArgumentException("topic is null " + portalDesignConfig.getTopic());
+ }
+ } else {
+ throw new IllegalArgumentException("Can not find topic in DB, topic name: " + portalDesignConfig.getTopic());
}
if (portalDesignConfig.getDesignType() != null) {
DesignType designType = designTypeRepository.findById(portalDesignConfig.getDesignType()).get();
- if (designType == null) throw new IllegalArgumentException("designType is null");
+ if (designType == null)
+ throw new IllegalArgumentException("designType is null");
portalDesign.setDesignType(designType);
- }else {
- throw new IllegalArgumentException("Can not find designType in Design_type, designType name "+portalDesignConfig.getDesignType());
+ } else {
+ throw new IllegalArgumentException("Can not find designType in Design_type, designType name " + portalDesignConfig.getDesignType());
}
}
-
public PortalDesign getPortalDesign(Integer id) {
-
+
Optional<PortalDesign> ret = portalDesignRepository.findById(id);
return ret.isPresent() ? ret.get() : null;
}
-
- public List<PortalDesignConfig> queryAllPortalDesign(){
+ public List<PortalDesignConfig> queryAllPortalDesign() {
List<PortalDesign> portalDesignList = null;
List<PortalDesignConfig> portalDesignConfigList = new ArrayList<>();
@@ -125,30 +135,21 @@ public class PortalDesignService {
return portalDesignConfigList;
}
-
- public boolean deploy(PortalDesign portalDesign){
- boolean flag =true;
- String designTypeName = portalDesign.getDesignType().getName();
- if (portalDesign.getDesignType() != null && "kibana_db".equals(designTypeName)) {
- flag = deployKibanaImport(portalDesign);
- } else if (portalDesign.getDesignType() != null && "kibana_visual".equals(designTypeName)) {
- //TODO
- flag =false;
- } else if (portalDesign.getDesignType() != null && "kibana_search".equals(designTypeName)) {
- //TODO
- flag = false;
- } else if (portalDesign.getDesignType() != null && "es_mapping".equals(designTypeName)) {
- flag = postEsMappingTemplate(portalDesign, portalDesign.getTopicName().getId().toLowerCase());
- } else if (portalDesign.getDesignType() != null && "druid_kafka_spec".equals(designTypeName)) {
- //TODO
- flag =false;
- } else {
- flag =false;
+ public boolean deploy(PortalDesign portalDesign) {
+ DesignType designType = portalDesign.getDesignType();
+ DesignTypeEnum designTypeEnum = DesignTypeEnum.valueOf(designType.getId());
+
+ switch (designTypeEnum) {
+ case KIBANA_DB:
+ return deployKibanaImport(portalDesign);
+ case ES_MAPPING:
+ return postEsMappingTemplate(portalDesign, portalDesign.getTopicName().getId().toLowerCase());
+ default:
+ log.error("Not implemented {}", designTypeEnum);
+ return false;
}
- return flag;
}
-
private boolean deployKibanaImport(PortalDesign portalDesign) throws RuntimeException {
POST_FLAG = "KibanaDashboardImport";
String requestBody = portalDesign.getBody();
@@ -168,20 +169,16 @@ public class PortalDesignService {
}
-
- private String kibanaImportUrl(String host, Integer port){
+ private String kibanaImportUrl(String host, Integer port) {
if (port == null) {
port = applicationConfiguration.getKibanaPort();
}
- return "http://"+host+":"+port+applicationConfiguration.getKibanaDashboardImportApi();
+ return "http://" + host + ":" + port + applicationConfiguration.getKibanaDashboardImportApi();
}
-
/**
- * successed resp:
- * {
- * "acknowledged": true
- * }
+ * successed resp: { "acknowledged": true }
+ *
* @param portalDesign
* @param templateName
* @return flag
@@ -189,7 +186,13 @@ public class PortalDesignService {
public boolean postEsMappingTemplate(PortalDesign portalDesign, String templateName) throws RuntimeException {
POST_FLAG = "ElasticsearchMappingTemplate";
String requestBody = portalDesign.getBody();
- return HttpClientUtil.sendPostHttpClient("http://"+dbService.getElasticsearch().getHost()+":9200/_template/"+templateName, requestBody, POST_FLAG);
+
+ //FIXME
+ Set<Db> dbs = portalDesign.getDbs();
+ //submit to each ES in dbs
+
+ //return HttpClientUtil.sendPostHttpClient("http://"+dbService.getElasticsearch().getHost()+":9200/_template/"+templateName, requestBody, POST_FLAG);
+ return false;
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
index dc04cf60..65de0bdc 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
@@ -50,7 +50,7 @@ public class PullService {
private boolean isRunning = false;
private ExecutorService executorService;
- private Thread topicConfigPollingThread;
+// private Thread topicConfigPollingThread;
private Set<Puller> pullers;
@Autowired
@@ -94,10 +94,11 @@ public class PullService {
}
}
- topicConfigPollingThread = new Thread(topicConfigPollingService);
+ executorService.submit(topicConfigPollingService);
+ /*topicConfigPollingThread = new Thread(topicConfigPollingService);
topicConfigPollingThread.setName("TopicConfigPolling");
topicConfigPollingThread.start();
-
+*/
isRunning = true;
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
@@ -126,11 +127,12 @@ public class PullService {
puller.shutdown();
}
- logger.info("stop TopicConfigPollingService ...");
- topicConfigPollingService.shutdown();
+// logger.info("stop TopicConfigPollingService ...");
+// topicConfigPollingService.shutdown();
- topicConfigPollingThread.join();
+ // topicConfigPollingThread.join();
+ logger.info("stop executorService ...");
executorService.shutdown();
executorService.awaitTermination(120L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
index 5cc3b55d..1550e531 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
@@ -29,7 +29,6 @@ import java.util.Properties;
import javax.annotation.PostConstruct;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -54,7 +53,6 @@ import org.springframework.stereotype.Service;
*/
@Service
-//@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class Puller implements Runnable {
@Autowired
@@ -75,6 +73,9 @@ public class Puller implements Runnable {
private Kafka kafka;
+ public Puller( ) {
+
+ }
public Puller(Kafka kafka) {
this.kafka = kafka;
}
@@ -84,11 +85,11 @@ public class Puller implements Runnable {
async = config.isAsync();
}
- private Properties getConsumerConfig() {//00
+ private Properties getConsumerConfig() {
Properties consumerConfig = new Properties();
- consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort());
- consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup());
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBrokerList());
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, kafka.getGroup());
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(Thread.currentThread().getId()));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
@@ -96,10 +97,10 @@ public class Puller implements Runnable {
consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- if (StringUtils.isNotBlank(config.getDmaapKafkaLogin())) {
- String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + config.getDmaapKafkaLogin() + " password=" + config.getDmaapKafkaPass() + " serviceName=kafka;";
+ if (kafka.isSecure()) {
+ String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + kafka.getLogin() + " password=" + kafka.getPass() + " serviceName=kafka;";
consumerConfig.put("sasl.jaas.config", jaas);
- consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.getDmaapKafkaSecurityProtocol());
+ consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafka.getSecurityProtocol());
consumerConfig.put("sasl.mechanism", "PLAIN");
}
return consumerConfig;
@@ -120,8 +121,8 @@ public class Puller implements Runnable {
try {
while (active) {
- if (topicConfigPollingService.isActiveTopicsChanged(true)) {//true means update local version as well
- List<String> topics = topicConfigPollingService.getActiveTopics(kafka);//00
+ if (topicConfigPollingService.isActiveTopicsChanged(kafka)) {
+ Collection<String> topics = topicConfigPollingService.getActiveTopics(kafka);
log.info("Active Topic list is changed, subscribe to the latest topics: {}", topics);
consumer.subscribe(topics, rebalanceListener);
}
@@ -141,7 +142,7 @@ public class Puller implements Runnable {
KafkaConsumer<String, String> consumer = consumerLocal.get();
log.debug("pulling...");
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(kafka.getTimeout()));
log.debug("done pulling.");
if (records != null && records.count() > 0) {
@@ -153,7 +154,7 @@ public class Puller implements Runnable {
messages.add(Pair.of(record.timestamp(), record.value()));
//log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
}
- storeService.saveMessages(kafka, partition.topic(), messages);//00
+ storeService.saveMessages(kafka, partition.topic(), messages);
log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index 291f1cad..f5a7698d 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -22,7 +22,9 @@ package org.onap.datalake.feeder.service;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Set;
import javax.annotation.PostConstruct;
@@ -32,13 +34,23 @@ import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
import org.json.XML;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.DbType;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
import org.onap.datalake.feeder.domain.Kafka;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
import org.onap.datalake.feeder.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,19 +73,10 @@ public class StoreService {
private ApplicationConfiguration config;
@Autowired
- private TopicConfigPollingService configPollingService;
-
- @Autowired
- private MongodbService mongodbService;
+ private ApplicationContext context;
@Autowired
- private CouchbaseService couchbaseService;
-
- @Autowired
- private ElasticsearchService elasticsearchService;
-
- @Autowired
- private HdfsService hdfsService;
+ private TopicConfigPollingService configPollingService;
private ObjectMapper yamlReader;
@@ -87,23 +90,41 @@ public class StoreService {
return;
}
- TopicConfig topicConfig = configPollingService.getEffectiveTopicConfig(topicStr);
+ Collection<EffectiveTopic> effectiveTopics = configPollingService.getEffectiveTopic(kafka, topicStr);
+ for(EffectiveTopic effectiveTopic:effectiveTopics) {
+ saveMessagesForTopic(effectiveTopic, messages);
+ }
+ }
+
+ private void saveMessagesForTopic(EffectiveTopic effectiveTopic, List<Pair<Long, String>> messages) {
+ if (!effectiveTopic.getTopic().isEnabled()) {
+ log.error("we should not come here {}", effectiveTopic);
+ return;
+ }
List<JSONObject> docs = new ArrayList<>();
for (Pair<Long, String> pair : messages) {
try {
- docs.add(messageToJson(topicConfig, pair));
+ docs.add(messageToJson(effectiveTopic, pair));
} catch (Exception e) {
//may see org.json.JSONException.
log.error("Error when converting this message to JSON: " + pair.getRight(), e);
}
}
- saveJsons(topicConfig, docs, messages);
+ Set<Db> dbs = effectiveTopic.getTopic().getDbs();
+
+ for (Db db : dbs) {
+ if (db.getDbType().isTool() || !db.isEnabled()) {
+ continue;
+ }
+ DbStoreService dbStoreService = findDbStoreService(db);
+ dbStoreService.saveJsons(effectiveTopic, docs);
+ }
}
- private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException {
+ private JSONObject messageToJson(EffectiveTopic effectiveTopic, Pair<Long, String> pair) throws IOException {
long timestamp = pair.getLeft();
String text = pair.getRight();
@@ -114,11 +135,11 @@ public class StoreService {
// log.debug("{} ={}", topicStr, text);
//}
- boolean storeRaw = topicConfig.isSaveRaw();
+ boolean storeRaw = effectiveTopic.getTopic().isSaveRaw();
JSONObject json = null;
- DataFormat dataFormat = topicConfig.getDataFormat2();
+ DataFormat dataFormat = effectiveTopic.getTopic().getDataFormat2();
switch (dataFormat) {
case JSON:
@@ -149,15 +170,15 @@ public class StoreService {
json.put(config.getRawDataLabel(), text);
}
- if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) {
- String[] paths = topicConfig.getAggregateArrayPath2();
+ if (StringUtils.isNotBlank(effectiveTopic.getTopic().getAggregateArrayPath())) {
+ String[] paths = effectiveTopic.getTopic().getAggregateArrayPath2();
for (String path : paths) {
JsonUtil.arrayAggregate(path, json);
}
}
- if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) {
- String[] paths = topicConfig.getFlattenArrayPath2();
+ if (StringUtils.isNotBlank(effectiveTopic.getTopic().getFlattenArrayPath())) {
+ String[] paths = effectiveTopic.getTopic().getFlattenArrayPath2();
for (String path : paths) {
JsonUtil.flattenArray(path, json);
}
@@ -166,29 +187,29 @@ public class StoreService {
return json;
}
- private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) {
- if (topic.supportMongoDB()) {
- mongodbService.saveJsons(topic, jsons);
- }
-
- if (topic.supportCouchbase()) {
- couchbaseService.saveJsons(topic, jsons);
- }
-
- if (topic.supportElasticsearch()) {
- elasticsearchService.saveJsons(topic, jsons);
- }
-
- if (topic.supportHdfs()) {
- hdfsService.saveMessages(topic, messages);
+ private DbStoreService findDbStoreService(Db db) {
+ DbType dbType = db.getDbType();
+ DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
+ switch (dbTypeEnum) {
+ case CB:
+ return context.getBean(CouchbaseService.class, db);
+ case ES:
+ return context.getBean(ElasticsearchService.class, db);
+ case HDFS:
+ return context.getBean(HdfsService.class, db);
+ case MONGO:
+ return context.getBean(MongodbService.class, db);
+ default:
+ log.error("we should not come here {}", dbTypeEnum);
+ return null;
}
}
public void flush() { //force flush all buffer
- hdfsService.flush();
+// hdfsService.flush();
}
public void flushStall() { //flush stall buffer
- hdfsService.flushStall();
+ // hdfsService.flushStall();
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
index 453b3ee9..8f703b1d 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
@@ -21,20 +21,23 @@
package org.onap.datalake.feeder.service;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
import org.onap.datalake.feeder.domain.Kafka;
-import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.repository.KafkaRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
@@ -52,45 +55,56 @@ public class TopicConfigPollingService implements Runnable {
ApplicationConfiguration config;
@Autowired
- private DmaapService dmaapService;
+ private ApplicationContext context;
- //effective TopicConfig Map
- private Map<String, TopicConfig> effectiveTopicConfigMap = new HashMap<>();
+ @Autowired
+ private KafkaRepository kafkaRepository;
+
+ //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic.
+ private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();;
+ //private Map<String, TopicConfig> effectiveTopicConfigMap;
//monitor Kafka topic list changes
- private List<String> activeTopics;
- private ThreadLocal<Integer> activeTopicsVersionLocal = ThreadLocal.withInitial(() -> -1);
- private int currentActiveTopicsVersion = -1;
+ private Map<String, Set<String>> activeTopicMap;
+
+ private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal = new ThreadLocal<>();
+ private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();
private boolean active = false;
@PostConstruct
private void init() {
try {
- log.info("init(), ccalling poll()...");
- activeTopics = poll();
- currentActiveTopicsVersion++;
+ log.info("init(), calling poll()...");
+ activeTopicMap = poll();
} catch (Exception ex) {
log.error("error connection to HDFS.", ex);
}
}
- public boolean isActiveTopicsChanged(boolean update) {//update=true means sync local version
- boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get();
- log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get());
- if (changed && update) {
- activeTopicsVersionLocal.set(currentActiveTopicsVersion);
+ public boolean isActiveTopicsChanged(Kafka kafka) {//update=true means sync local version
+ String kafkaId = kafka.getId();
+ int currentActiveTopicsVersion = currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1);//init did one version
+ int localActiveTopicsVersion = activeTopicsVersionLocal.get().getOrDefault(kafkaId, 0);
+
+ boolean changed = currentActiveTopicsVersion > localActiveTopicsVersion;
+ log.debug("kafkaId={} isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", kafkaId, changed, currentActiveTopicsVersion, localActiveTopicsVersion);
+ if (changed) {
+ activeTopicsVersionLocal.get().put(kafkaId, currentActiveTopicsVersion);
}
return changed;
}
- public List<String> getActiveTopics(Kafka kafka) {
- return activeTopics;
+ //get a list of topic names to monitor
+ public Collection<String> getActiveTopics(Kafka kafka) {
+ return activeTopicMap.get(kafka.getId());
}
- public TopicConfig getEffectiveTopicConfig(String topicStr) {
- return effectiveTopicConfigMap.get(topicStr);
+ //get the EffectiveTopics given kafka and topic name
+ public Collection<EffectiveTopic> getEffectiveTopic(Kafka kafka, String topicStr) {
+ Map<String, List<EffectiveTopic>> effectiveTopicMapKafka= effectiveTopicMap.get(kafka.getId());
+ return effectiveTopicMapKafka.get(topicStr);
}
@Override
@@ -100,7 +114,7 @@ public class TopicConfigPollingService implements Runnable {
while (active) {
try { //sleep first since we already pool in init()
- Thread.sleep(config.getDmaapCheckNewTopicInterval());
+ Thread.sleep(config.getCheckTopicInterval());
if(!active) {
break;
}
@@ -110,15 +124,23 @@ public class TopicConfigPollingService implements Runnable {
}
try {
- List<String> newTopics = poll();
- if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) {
- log.info("activeTopics list is updated, old={}", activeTopics);
- log.info("activeTopics list is updated, new={}", newTopics);
-
- activeTopics = newTopics;
- currentActiveTopicsVersion++;
- } else {
- log.debug("activeTopics list is not updated.");
+ Map<String, Set<String>> newTopicsMap = poll();
+
+ for(Map.Entry<String, Set<String>> entry:newTopicsMap.entrySet()) {
+ String kafkaId = entry.getKey();
+ Set<String> newTopics = entry.getValue();
+
+ Set<String> activeTopics = activeTopicMap.get(kafkaId);
+
+ if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) {
+ log.info("activeTopics list is updated, old={}", activeTopics);
+ log.info("activeTopics list is updated, new={}", newTopics);
+
+ activeTopicMap.put(kafkaId, newTopics);
+ currentActiveTopicsVersionMap.put(kafkaId, currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1)+1);
+ } else {
+ log.debug("activeTopics list is not updated.");
+ }
}
} catch (IOException e) {
log.error("dmaapService.getActiveTopics()", e);
@@ -132,17 +154,27 @@ public class TopicConfigPollingService implements Runnable {
active = false;
}
- private List<String> poll() throws IOException {
+ private Map<String, Set<String>> poll() throws IOException {
+ Map<String, Set<String>> ret = new HashMap<>();
+ Iterable<Kafka> kafkas = kafkaRepository.findAll();
+ for (Kafka kafka : kafkas) {
+ if (kafka.isEnabled()) {
+ Set<String> topics = poll(kafka);
+ ret.put(kafka.getId(), topics);
+ }
+ }
+ return ret;
+ }
+
+ private Set<String> poll(Kafka kafka) throws IOException {
log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
- List<TopicConfig> activeTopicConfigs = dmaapService.getActiveTopicConfigs();
- Map<String, TopicConfig> tempEffectiveTopicConfigMap = new HashMap<>();
- activeTopicConfigs.stream().forEach(topicConfig -> tempEffectiveTopicConfigMap.put(topicConfig.getName(), topicConfig));
- effectiveTopicConfigMap = tempEffectiveTopicConfigMap;
- log.debug("poll(), effectiveTopicConfigMap={}", effectiveTopicConfigMap);
+ DmaapService dmaapService = context.getBean(DmaapService.class, kafka);
+
+ Map<String, List<EffectiveTopic>> activeEffectiveTopics = dmaapService.getActiveEffectiveTopic();
+ effectiveTopicMap.put(kafka.getId(), activeEffectiveTopics);
- List<String> ret = new ArrayList<>(activeTopicConfigs.size());
- activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName()));
+ Set<String> ret = activeEffectiveTopics.keySet();
return ret;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index dd8664e8..86b27a9a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -21,23 +21,31 @@
package org.onap.datalake.feeder.service;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
+import org.apache.commons.collections.CollectionUtils;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Kafka;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.DbRepository;
+import org.onap.datalake.feeder.repository.TopicNameRepository;
import org.onap.datalake.feeder.repository.TopicRepository;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
- * Service for topics
+ * Service for topics
*
* @author Guobiao Mo
*
@@ -49,72 +57,90 @@ public class TopicService {
@Autowired
private ApplicationConfiguration config;
-
+
@Autowired
- private TopicRepository topicRepository;
+ private ApplicationContext context;
@Autowired
- private ElasticsearchService elasticsearchService;
+ private TopicNameRepository topicNameRepository;
+ @Autowired
+ private TopicRepository topicRepository;
@Autowired
private DbRepository dbRepository;
- public TopicConfig getEffectiveTopic(String topicStr) {
- try {
- return getEffectiveTopic(topicStr, false);
- } catch (IOException e) {
- log.error(topicStr, e);
+ public List<EffectiveTopic> getEnabledEffectiveTopic(Kafka kafka, String topicStr, boolean ensureTableExist) throws IOException {
+
+ List<Topic> topics = findTopics(kafka, topicStr);
+ if (CollectionUtils.isEmpty(topics)) {
+ topics = new ArrayList<>();
+ topics.add(getDefaultTopic(kafka));
}
- return null;
- }
- public TopicConfig getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
- Topic topic = getTopic(topicStr);
- if (topic == null) {
- topic = getDefaultTopic();
+ List<EffectiveTopic> ret = new ArrayList<>();
+ for (Topic topic : topics) {
+ if (!topic.isEnabled()) {
+ continue;
+ }
+ ret.add(new EffectiveTopic(topic, topicStr));
+
+ if (ensureTableExist) {
+ for (Db db : topic.getDbs()) {
+ if (db.isElasticsearch()) {
+ ElasticsearchService elasticsearchService = context.getBean(ElasticsearchService.class, db);
+ elasticsearchService.ensureTableExist(topicStr);
+ }
+ }
+ }
}
- TopicConfig topicConfig = topic.getTopicConfig();
- topicConfig.setName(topicStr);//need to change name if it comes from DefaultTopic
+
+ return ret;
+ }
+
+ //TODO use query
+ public List<Topic> findTopics(Kafka kafka, String topicStr) {
+ List<Topic> ret = new ArrayList<>();
- if(ensureTableExist && topicConfig.isEnabled() && topicConfig.supportElasticsearch()) {
- elasticsearchService.ensureTableExist(topicStr);
+ Iterable<Topic> allTopics = topicRepository.findAll();
+ for(Topic topic: allTopics) {
+ if(topic.getKafkas().contains(kafka ) && topic.getTopicName().getId().equals(topicStr)){
+ ret.add(topic);
+ }
}
- return topicConfig;
+ return ret;
}
- public Topic getTopic(String topicStr) {
- Optional<Topic> ret = topicRepository.findById(null);//FIXME
+ public Topic getTopic(int topicId) {
+ Optional<Topic> ret = topicRepository.findById(topicId);
return ret.isPresent() ? ret.get() : null;
}
- public Topic getDefaultTopic() {
- return getTopic(config.getDefaultTopicName());
+ public Topic getDefaultTopic(Kafka kafka) {
+ return findTopics(kafka, config.getDefaultTopicName()).get(0);
}
- public boolean istDefaultTopic(Topic topic) {
+ public boolean isDefaultTopic(Topic topic) {
if (topic == null) {
return false;
}
- return true;//topic.getName().equals(config.getDefaultTopicName());
+ return topic.getName().equals(config.getDefaultTopicName());
}
- public void fillTopicConfiguration(TopicConfig tConfig, Topic wTopic)
- {
+ public void fillTopicConfiguration(TopicConfig tConfig, Topic wTopic) {
fillTopic(tConfig, wTopic);
}
- public Topic fillTopicConfiguration(TopicConfig tConfig)
- {
+ public Topic fillTopicConfiguration(TopicConfig tConfig) {
Topic topic = new Topic();
fillTopic(tConfig, topic);
return topic;
}
- private void fillTopic(TopicConfig tConfig, Topic topic)
- {
+ private void fillTopic(TopicConfig tConfig, Topic topic) {
Set<Db> relateDb = new HashSet<>();
- //topic.setName(tConfig.getName());
+ topic.setId(tConfig.getId());
+ topic.setTopicName(topicNameRepository.findById(tConfig.getName()).get());
topic.setLogin(tConfig.getLogin());
topic.setPass(tConfig.getPassword());
topic.setEnabled(tConfig.isEnabled());
@@ -126,24 +152,21 @@ public class TopicService {
topic.setAggregateArrayPath(tConfig.getAggregateArrayPath());
topic.setFlattenArrayPath(tConfig.getFlattenArrayPath());
- if(tConfig.getSinkdbs() != null) {
+ if (tConfig.getSinkdbs() != null) {
for (String item : tConfig.getSinkdbs()) {
Db sinkdb = dbRepository.findByName(item);
if (sinkdb != null) {
relateDb.add(sinkdb);
}
}
- if(relateDb.size() > 0)
+ if (relateDb.size() > 0)
topic.setDbs(relateDb);
- else if(relateDb.size() == 0)
- {
+ else if (relateDb.size() == 0) {
topic.getDbs().clear();
}
- }else
- {
+ } else {
topic.setDbs(relateDb);
}
-
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
index fc31b2eb..33c8847e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.datalake.feeder.service;
+package org.onap.datalake.feeder.service.db;
import java.util.ArrayList;
import java.util.List;
@@ -30,7 +30,8 @@ import javax.annotation.PreDestroy;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -55,25 +56,33 @@ import rx.functions.Func1;
*
*/
@Service
-public class CouchbaseService {
+public class CouchbaseService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
ApplicationConfiguration config;
-
+
+ private Db couchbase;
+/*
@Autowired
private DbService dbService;
- Bucket bucket;
private boolean isReady = false;
+*/
+ Bucket bucket;
+ public CouchbaseService( ) {
+
+ }
+ public CouchbaseService(Db db) {
+ couchbase = db;
+ }
+
@PostConstruct
private void init() {
// Initialize Couchbase Connection
try {
- Db couchbase = dbService.getCouchbase();
-
//this tunes the SDK (to customize connection timeout)
CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder().connectTimeout(60000) // 60s, default is 5s
.build();
@@ -84,10 +93,10 @@ public class CouchbaseService {
bucket.bucketManager().createN1qlPrimaryIndex(true, false);
log.info("Connected to Couchbase {} as {}", couchbase.getHost(), couchbase.getLogin());
- isReady = true;
+// isReady = true;
} catch (Exception ex) {
log.error("error connection to Couchbase.", ex);
- isReady = false;
+ // isReady = false;
}
}
@@ -103,7 +112,8 @@ public class CouchbaseService {
}
}
- public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+ @Override
+ public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) {
List<JsonDocument> documents = new ArrayList<>(jsons.size());
for (JSONObject json : jsons) {
//convert to Couchbase JsonObject from org.json JSONObject
@@ -112,9 +122,9 @@ public class CouchbaseService {
long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson()
//setup TTL
- int expiry = (int) (timestamp / 1000L) + topic.getTtl() * 3600 * 24; //in second
+ int expiry = (int) (timestamp / 1000L) + effectiveTopic.getTopic().getTtl() * 3600 * 24; //in second
- String id = getId(topic, json);
+ String id = getId(effectiveTopic.getTopic(), json);
JsonDocument doc = JsonDocument.create(id, expiry, jsonObject);
documents.add(doc);
}
@@ -133,10 +143,10 @@ public class CouchbaseService {
} catch (Exception e) {
log.error("error saving to Couchbase.", e);
}
- log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size());
+ log.debug("saved text to topic = {}, this batch count = {} ", effectiveTopic, documents.size());
}
- public String getId(TopicConfig topic, JSONObject json) {
+ public String getId(Topic topic, JSONObject json) {
//if this topic requires extract id from JSON
String id = topic.getMessageId(json);
if (id != null) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java
new file mode 100644
index 00000000..5ea6e23e
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java
@@ -0,0 +1,37 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2018 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service.db;
+
+import java.util.List;
+
+import org.json.JSONObject;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+
+/**
+ * Interface for all db store services
+ *
+ * @author Guobiao Mo
+ *
+ */
+public interface DbStoreService {
+
+ void saveJsons(EffectiveTopic topic, List<JSONObject> jsons);
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
index b40f544c..aee63ed7 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.datalake.feeder.service;
+package org.onap.datalake.feeder.service.db;
import java.io.IOException;
import java.util.List;
@@ -47,7 +47,8 @@ import org.elasticsearch.rest.RestStatus;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
+import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,24 +62,33 @@ import org.springframework.stereotype.Service;
*
*/
@Service
-public class ElasticsearchService {
+public class ElasticsearchService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private Db elasticsearch;
@Autowired
private ApplicationConfiguration config;
- @Autowired
- private DbService dbService;
+ //@Autowired
+// private DbService dbService;
private RestHighLevelClient client;
ActionListener<BulkResponse> listener;
+
+ public ElasticsearchService( ) {
+
+ }
+ public ElasticsearchService(Db db) {
+ elasticsearch = db;
+ }
//ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
//Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
@PostConstruct
private void init() {
- Db elasticsearch = dbService.getElasticsearch();
+ //Db elasticsearch = dbService.getElasticsearch();
String elasticsearchHost = elasticsearch.getHost();
// Initialize the Connection
@@ -130,24 +140,25 @@ public class ElasticsearchService {
}
//TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
- public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+ @Override
+ public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) {
BulkRequest request = new BulkRequest();
for (JSONObject json : jsons) {
- if (topic.isCorrelateClearedMessage()) {
- boolean found = correlateClearedMessage(topic, json);
+ if (effectiveTopic.getTopic().isCorrelateClearedMessage()) {
+ boolean found = correlateClearedMessage(effectiveTopic.getTopic(), json);
if (found) {
continue;
}
}
- String id = topic.getMessageId(json); //id can be null
+ String id = effectiveTopic.getTopic().getMessageId(json); //id can be null
- request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
+ request.add(new IndexRequest(effectiveTopic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
}
- log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size());
+ log.debug("saving text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size());
if (config.isAsync()) {
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
@@ -158,7 +169,7 @@ public class ElasticsearchService {
log.debug(bulkResponse.buildFailureMessage());
}
} catch (IOException e) {
- log.error(topic.getName(), e);
+ log.error(effectiveTopic.getName(), e);
}
}
@@ -175,7 +186,7 @@ public class ElasticsearchService {
* source. So use the get API, three parameters: index, type, document
* id
*/
- private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) {
+ private boolean correlateClearedMessage(Topic topic, JSONObject json) {
boolean found = false;
String eName = null;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java
index d92d05ac..0e107fdf 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.datalake.feeder.service;
+package org.onap.datalake.feeder.service.db;
import java.io.IOException;
import java.net.InetAddress;
@@ -38,9 +38,10 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ShutdownHookManager;
+import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
import org.onap.datalake.feeder.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,16 +59,15 @@ import lombok.Setter;
*
*/
@Service
-public class HdfsService {
+public class HdfsService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private Db hdfs;
@Autowired
ApplicationConfiguration config;
- @Autowired
- private DbService dbService;
-
FileSystem fileSystem;
private boolean isReady = false;
@@ -113,6 +113,14 @@ public class HdfsService {
messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used
}
+ public void addData2(List<JSONObject> messages) {
+ if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
+ lastFlush = System.currentTimeMillis();
+ }
+
+ messages.stream().forEach(message -> data.add(message.toString()));
+ }
+
private void saveMessages(String topic, List<String> bufferList) throws IOException {
long thread = Thread.currentThread().getId();
@@ -144,12 +152,17 @@ public class HdfsService {
}
}
+ public HdfsService( ) {
+ }
+
+ public HdfsService(Db db) {
+ hdfs = db;
+ }
+
@PostConstruct
private void init() {
// Initialize HDFS Connection
try {
- Db hdfs = dbService.getHdfs();
-
//Get configuration of Hadoop system
Configuration hdfsConfig = new Configuration();
@@ -200,7 +213,8 @@ public class HdfsService {
bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
}
- public void saveMessages(TopicConfig topic, List<Pair<Long, String>> messages) {
+ //used if raw data should be saved
+ public void saveMessages(EffectiveTopic topic, List<Pair<Long, String>> messages) {
String topicStr = topic.getName();
Map<String, Buffer> bufferMap = bufferLocal.get();
@@ -215,4 +229,21 @@ public class HdfsService {
}
}
+ @Override
+ public void saveJsons(EffectiveTopic topic, List<JSONObject> jsons) {
+ String topicStr = topic.getName();
+
+ Map<String, Buffer> bufferMap = bufferLocal.get();
+ final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
+
+ buffer.addData2(jsons);
+
+ if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
+ buffer.flush(topicStr);
+ } else {
+ log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
+ }
+
+ }
+
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
index f3462e49..0f522f6b 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.datalake.feeder.service;
+package org.onap.datalake.feeder.service.db;
import java.util.ArrayList;
import java.util.HashMap;
@@ -34,7 +34,7 @@ import org.bson.Document;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.domain.EffectiveTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,26 +59,32 @@ import com.mongodb.client.model.InsertManyOptions;
*
*/
@Service
-public class MongodbService {
+public class MongodbService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private Db mongodb;
@Autowired
private ApplicationConfiguration config;
private boolean dbReady = false;
- @Autowired
- private DbService dbService;
+ //@Autowired
+// private DbService dbService;
private MongoDatabase database;
private MongoClient mongoClient;
private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
private InsertManyOptions insertManyOptions;
+ public MongodbService( ) {
+ }
+ public MongodbService(Db db) {
+ mongodb = db;
+ }
+
@PostConstruct
private void init() {
- Db mongodb = dbService.getMongoDB();
-
String host = mongodb.getHost();
Integer port = mongodb.getPort();
@@ -141,7 +147,7 @@ public class MongodbService {
}
}
- public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+ public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) {
if (dbReady == false)//TOD throw exception
return;
List<Document> documents = new ArrayList<>(jsons.size());
@@ -149,14 +155,14 @@ public class MongodbService {
//convert org.json JSONObject to MongoDB Document
Document doc = Document.parse(json.toString());
- String id = topic.getMessageId(json); //id can be null
+ String id = effectiveTopic.getTopic().getMessageId(json); //id can be null
if (id != null) {
doc.put("_id", id);
}
documents.add(doc);
}
- String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ .
+ String collectionName = effectiveTopic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ .
MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k));
try {
@@ -168,7 +174,7 @@ public class MongodbService {
}
}
- log.debug("saved text to topic = {}, batch count = {} ", topic, jsons.size());
+ log.debug("saved text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size());
}
}