diff options
author | 2019-06-27 18:42:59 -0700 | |
---|---|---|
committer | 2019-06-27 18:46:11 -0700 | |
commit | b14c5766902d486a94a8db96d2a31ff0e9e8255e (patch) | |
tree | 421f9bd6ac50f36d5f128bfab7fd3653b9ff8894 /components/datalake-handler/feeder/src/main | |
parent | b3f5051484f5b973a47a60fb8f76a67ca5ff88da (diff) |
supports multiple Kafka clusters and DBs
Read data from Kafka and store into DBs
Issue-ID: DCAEGEN2-1631
Change-Id: Ic2736b6e0497ac2084b1a7ce0da3a6e0e1379f43
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main')
27 files changed, 748 insertions, 443 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index e371af1b..806dc72e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -54,6 +54,8 @@ public class ApplicationConfiguration { private String defaultTopicName; + private int checkTopicInterval; //in millisecond +/* //DMaaP private String dmaapZookeeperHostPort; private String dmaapKafkaHostPort; @@ -68,7 +70,7 @@ public class ApplicationConfiguration { private int dmaapCheckNewTopicInterval; //in millisecond private int kafkaConsumerCount; - +*/ private String elasticsearchType; //HDFS diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java index bd9b742b..322be412 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java @@ -27,8 +27,6 @@ import javax.servlet.http.HttpServletResponse; import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.repository.DbRepository; -import org.onap.datalake.feeder.repository.TopicRepository; -import org.onap.datalake.feeder.service.DbService; import org.onap.datalake.feeder.dto.DbConfig; import org.onap.datalake.feeder.controller.domain.PostReturnBody; import org.slf4j.Logger; @@ -59,12 +57,6 @@ public class DbController { @Autowired private DbRepository dbRepository; - @Autowired - private TopicRepository topicRepository; - - @Autowired - private DbService dbService; - //list all dbs @GetMapping("") @ResponseBody @@ -92,11 +84,11 @@ public class DbController { return null; } - Db oldDb = dbService.getDb(dbConfig.getName()); +/* Db oldDb = dbService.getDb(dbConfig.getName()); if (oldDb != null) { sendError(response, 400, "Db already exists: " + dbConfig.getName()); return null; - } else { + } else {*/ Db newdb = new Db(); newdb.setName(dbConfig.getName()); newdb.setHost(dbConfig.getHost()); @@ -118,7 +110,7 @@ public class DbController { retBody.setReturnBody(retMsg); retBody.setStatusCode(200); return retBody; - } + //} } //Show a db @@ -191,7 +183,7 @@ public class DbController { return null; } - Db oldDb = dbService.getDb(dbConfig.getName()); + Db oldDb = dbRepository.findById(dbConfig.getId()).get(); if (oldDb == null) { sendError(response, 404, "Db not found: " + dbConfig.getName()); return null; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java index 93cec8bb..1162aedd 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java @@ -27,17 +27,18 @@ import java.util.Set; import javax.servlet.http.HttpServletResponse; import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.Kafka; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.controller.domain.PostReturnBody; import org.onap.datalake.feeder.dto.TopicConfig; -import org.onap.datalake.feeder.repository.DbRepository; +import org.onap.datalake.feeder.repository.KafkaRepository; import org.onap.datalake.feeder.repository.TopicRepository; -import org.onap.datalake.feeder.service.DbService; import org.onap.datalake.feeder.service.DmaapService; import org.onap.datalake.feeder.service.TopicService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.http.MediaType; import org.springframework.validation.BindingResult; import org.springframework.web.bind.annotation.DeleteMapping; @@ -71,19 +72,27 @@ public class TopicController { private final Logger log = LoggerFactory.getLogger(this.getClass()); + //@Autowired + //private DmaapService dmaapService; + @Autowired - private DmaapService dmaapService; + private ApplicationContext context; @Autowired + private KafkaRepository kafkaRepository; + + @Autowired private TopicRepository topicRepository; @Autowired private TopicService topicService; - @GetMapping("/dmaap") + @GetMapping("/dmaap/{kafkaId}") @ResponseBody @ApiOperation(value = "List all topic names in DMaaP.") - public List<String> listDmaapTopics() { + public List<String> listDmaapTopics(@PathVariable("kafkaId") String kafkaId ) { + Kafka kafka = kafkaRepository.findById(kafkaId).get(); + DmaapService dmaapService = context.getBean(DmaapService.class, kafka); return dmaapService.getTopics(); } @@ -95,7 +104,7 @@ public class TopicController { List<String> retString = new ArrayList<>(); for(Topic item : ret) { - if(!topicService.istDefaultTopic(item)) + if(!topicService.isDefaultTopic(item)) retString.add(item.getName()); } return retString; @@ -110,24 +119,25 @@ public class TopicController { sendError(response, 400, "Error parsing Topic: "+result.toString()); return null; } - Topic oldTopic = topicService.getTopic(topicConfig.getName()); + /*Topic oldTopic = topicService.getTopic(topicConfig.getName()); if (oldTopic != null) { sendError(response, 400, "Topic already exists "+topicConfig.getName()); return null; - } else { + } else {*/ Topic wTopic = topicService.fillTopicConfiguration(topicConfig); if(wTopic.getTtl() == 0) wTopic.setTtl(3650); topicRepository.save(wTopic); return mkPostReturnBody(200, wTopic); - } + //} + //FIXME need to connect to Kafka } - @GetMapping("/{topicName}") + @GetMapping("/{topicId}") @ResponseBody @ApiOperation(value="Get a topic's settings.") - public TopicConfig getTopic(@PathVariable("topicName") String topicName, HttpServletResponse response) throws IOException { - Topic topic = topicService.getTopic(topicName); + public TopicConfig getTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException { + Topic topic = topicService.getTopic(topicId); if(topic == null) { sendError(response, 404, "Topic not found"); return null; @@ -137,23 +147,23 @@ public class TopicController { //This is not a partial update: old topic is wiped out, and new topic is created based on the input json. //One exception is that old DBs are kept - @PutMapping("/{topicName}") + @PutMapping("/{topicId}") @ResponseBody @ApiOperation(value="Update a topic.") - public PostReturnBody<TopicConfig> updateTopic(@PathVariable("topicName") String topicName, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException { + public PostReturnBody<TopicConfig> updateTopic(@PathVariable("topicId") int topicId, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException { if (result.hasErrors()) { sendError(response, 400, "Error parsing Topic: "+result.toString()); return null; } - if(!topicName.equals(topicConfig.getName())) + if(topicId!=topicConfig.getId()) { - sendError(response, 400, "Topic name mismatch" + topicName + topicConfig.getName()); + sendError(response, 400, "Topic name mismatch" + topicId + topicConfig); return null; } - Topic oldTopic = topicService.getTopic(topicConfig.getName()); + Topic oldTopic = topicService.getTopic(topicId); if (oldTopic == null) { sendError(response, 404, "Topic not found "+topicConfig.getName()); return null; @@ -164,14 +174,14 @@ public class TopicController { } } - @DeleteMapping("/{topicName}") + @DeleteMapping("/{topicId}") @ResponseBody - @ApiOperation(value="Update a topic.") - public void deleteTopic(@PathVariable("topicName") String topicName, HttpServletResponse response) throws IOException + @ApiOperation(value="Delete a topic.") + public void deleteTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException { - Topic oldTopic = topicService.getTopic(topicName); + Topic oldTopic = topicService.getTopic(topicId); if (oldTopic == null) { - sendError(response, 404, "Topic not found "+topicName); + sendError(response, 404, "Topic not found "+topicId); } else { Set<Db> dbRelation = oldTopic.getDbs(); dbRelation.clear(); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java index d84b34f8..7059cd09 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java @@ -32,6 +32,9 @@ import javax.persistence.JoinTable; import javax.persistence.ManyToMany; import javax.persistence.ManyToOne; import javax.persistence.Table; + +import org.onap.datalake.feeder.enumeration.DbTypeEnum; + import com.fasterxml.jackson.annotation.JsonBackReference; import lombok.Getter; import lombok.Setter; @@ -51,12 +54,12 @@ public class Db { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) @Column(name = "`id`") - private Integer id; + private int id; @Column(name="`name`") private String name; - @Column(name="`enabled`") + @Column(name="`enabled`", nullable = false) private boolean enabled; @Column(name="`host`") @@ -98,13 +101,30 @@ public class Db { ) private Set<Topic> topics; - public Db() { + public boolean isHdfs() { + return isDb(DbTypeEnum.HDFS); + } + + public boolean isElasticsearch() { + return isDb(DbTypeEnum.ES); + } + + public boolean isCouchbase() { + return isDb(DbTypeEnum.CB); + } + + public boolean isDruid() { + return isDb(DbTypeEnum.DRUID); } - public Db(String name) { - this.name = name; + public boolean isMongoDB() { + return isDb(DbTypeEnum.MONGO); } + private boolean isDb(DbTypeEnum dbTypeEnum) { + return dbTypeEnum.equals(DbTypeEnum.valueOf(dbType.getId())); + } + @Override public String toString() { return String.format("Db %s (name=%, enabled=%s)", id, name, enabled); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java index 0a88b155..9c83a9cd 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java @@ -48,14 +48,14 @@ public class DbType { @Column(name="`id`") private String id; - @Column(name="`name`") + @Column(name="`name`", nullable = false) private String name; @Column(name="`default_port`") private Integer defaultPort; - @Column(name="`tool`") - private Boolean tool; + @Column(name="`tool`", nullable = false) + private boolean tool; @OneToMany(cascade = CascadeType.ALL, fetch = FetchType.LAZY, mappedBy = "dbType") protected Set<Db> dbs = new HashSet<>(); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java new file mode 100644 index 00000000..df7aad04 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java @@ -0,0 +1,64 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.domain; + +/** + * A warper of parent Topic + * + * @author Guobiao Mo + * + */ + +public class EffectiveTopic { + private Topic topic; //base Topic + + String name; + + public EffectiveTopic(Topic baseTopic) { + topic = baseTopic; + } + + public EffectiveTopic(Topic baseTopic, String name ) { + topic = baseTopic; + this.name = name; + } + + public String getName() { + return name==null?topic.getName():name; + } + + public void setName(String name) { + this.name = name; + } + + public Topic getTopic() { + return topic; + } + + public void setTopic(Topic topic) { + this.topic = topic; + } + + @Override + public String toString() { + return String.format("EffectiveTopic %s (base Topic=%s)", getName(), topic.toString()); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java index e3347a4a..d2189cbc 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java @@ -49,23 +49,23 @@ public class Kafka { @Column(name="`id`") private String id; - @Column(name="`name`") + @Column(name="`name`", nullable = false) private String name; - @Column(name="`enabled`") + @Column(name="`enabled`", nullable = false) private boolean enabled; - @Column(name="broker_list") + @Column(name="broker_list", nullable = false) private String brokerList;//message-router-kafka:9092,message-router-kafka2:9092 - @Column(name="`zk`") + @Column(name="`zk`", nullable = false) private String zooKeeper;//message-router-zookeeper:2181 @Column(name="`group`", columnDefinition = "varchar(255) DEFAULT 'datalake'") private String group; @Column(name="`secure`", columnDefinition = " bit(1) DEFAULT 0") - private Boolean secure; + private boolean secure; @Column(name="`login`") private String login; @@ -81,8 +81,7 @@ public class Kafka { @Column(name="`included_topic`") private String includedTopic; - //@Column(name="`excluded_topic`", columnDefinition = "varchar(1023) default '__consumer_offsets,__transaction_state'") - @Column(name="`excluded_topic`") + @Column(name="`excluded_topic`", columnDefinition = "varchar(1023) default '__consumer_offsets,__transaction_state'") private String excludedTopic; @Column(name="`consumer_count`", columnDefinition = "integer default 3") @@ -93,8 +92,8 @@ public class Kafka { private Integer timeout; //don't show this field in admin UI - @Column(name="`check_topic_interval_sec`", columnDefinition = "integer default 10") - private Integer checkTopicInterval; + //@Column(name="`check_topic_interval_sec`", columnDefinition = "integer default 10") +// private Integer checkTopicInterval; @JsonBackReference @ManyToMany(fetch = FetchType.EAGER) diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index cb07e140..a27b6756 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -20,6 +20,7 @@ package org.onap.datalake.feeder.domain; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -33,7 +34,16 @@ import javax.persistence.ManyToMany; import javax.persistence.ManyToOne; import javax.persistence.Table; +import org.apache.commons.lang3.StringUtils; +import org.json.JSONObject; import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.enumeration.DataFormat; +import org.onap.datalake.feeder.enumeration.DbTypeEnum; +import org.onap.datalake.feeder.service.db.CouchbaseService; +import org.onap.datalake.feeder.service.db.DbStoreService; +import org.onap.datalake.feeder.service.db.ElasticsearchService; +import org.onap.datalake.feeder.service.db.HdfsService; +import org.onap.datalake.feeder.service.db.MongodbService; import com.fasterxml.jackson.annotation.JsonBackReference; @@ -71,30 +81,30 @@ public class Topic { //@JsonManagedReference @ManyToMany(fetch = FetchType.EAGER) @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "db_id") }) - protected Set<Db> dbs; + protected Set<Db> dbs=new HashSet<>(); @ManyToMany(fetch = FetchType.EAGER) @JoinTable(name = "map_kafka_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "kafka_id") }) - protected Set<Kafka> kafkas; + protected Set<Kafka> kafkas=new HashSet<>(); /** * indicate if we should monitor this topic */ - @Column(name = "`enabled`") - private Boolean enabled; + @Column(name = "`enabled`", nullable = false) + private boolean enabled; /** * save raw message text */ - @Column(name = "`save_raw`") - private Boolean saveRaw; + @Column(name = "`save_raw`", nullable = false, columnDefinition = " bit(1) DEFAULT 0") + private boolean saveRaw; /** * need to explicitly tell feeder the data format of the message. support JSON, * XML, YAML, TEXT */ @Column(name = "`data_format`") - private String dataFormat; + protected String dataFormat; /** * TTL in day @@ -103,41 +113,33 @@ public class Topic { private Integer ttl; //if this flag is true, need to correlate alarm cleared message to previous alarm - @Column(name = "`correlate_cleared_message`") - private Boolean correlateClearedMessage; + @Column(name = "`correlate_cleared_message`", nullable = false, columnDefinition = " bit(1) DEFAULT 0") + private boolean correlateClearedMessage; //paths to the values in the JSON that are used to composite DB id, comma separated, example: "/event-header/id,/event-header/entity-type,/entity/product-name" @Column(name = "`message_id_path`") - private String messageIdPath; + protected String messageIdPath; //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray" - @Column(name = "`aggregate_array_path`") - private String aggregateArrayPath; + @Column(name = "`aggregate_array_path`") + protected String aggregateArrayPath; //paths to the element in array that need flatten, this element is used as label, comma separated, //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..." - @Column(name = "`flatten_array_path`") - private String flattenArrayPath; + @Column(name = "`flatten_array_path`") + protected String flattenArrayPath; public Topic() { } - +/* public Topic(String name) {//TODO //this.name = name; } - +*/ public String getName() { return topicName.getId(); } - public boolean isEnabled() { - return is(enabled); - } - - public boolean isCorrelateClearedMessage() { - return is(correlateClearedMessage); - } - public int getTtl() { if (ttl != null) { return ttl; @@ -145,27 +147,86 @@ public class Topic { return 3650;//default to 10 years for safe } } +/* + public boolean supportHdfs() { + return supportDb(DbTypeEnum.HDFS); + } + + public boolean supportElasticsearch() { + return supportDb(DbTypeEnum.ES); + } + + public boolean supportCouchbase() { + return supportDb(DbTypeEnum.CB); + } - private boolean is(Boolean b) { - return is(b, false); + public boolean supportDruid() { + return supportDb(DbTypeEnum.DRUID); } - private boolean is(Boolean b, boolean defaultValue) { - if (b != null) { - return b; + public boolean supportMongoDB() { + return supportDb(DbTypeEnum.MONGO); + } + + private boolean supportDb(DbTypeEnum dbTypeEnum) { + for(Db db : dbs) { + + } + } +*/ + public DataFormat getDataFormat2() { + if (dataFormat != null) { + return DataFormat.fromString(dataFormat); } else { - return defaultValue; + return null; + } + } + + public String[] getAggregateArrayPath2() { + String[] ret = null; + + if (StringUtils.isNotBlank(aggregateArrayPath)) { + ret = aggregateArrayPath.split(","); + } + + return ret; + } + + public String[] getFlattenArrayPath2() { + String[] ret = null; + + if (StringUtils.isNotBlank(flattenArrayPath)) { + ret = flattenArrayPath.split(","); } + + return ret; } - public boolean isSaveRaw() { - return is(saveRaw); + //extract DB id from JSON attributes, support multiple attributes + public String getMessageId(JSONObject json) { + String id = null; + + if (StringUtils.isNotBlank(messageIdPath)) { + String[] paths = messageIdPath.split(","); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < paths.length; i++) { + if (i > 0) { + sb.append('^'); + } + sb.append(json.query(paths[i]).toString()); + } + id = sb.toString(); + } + + return id; } public TopicConfig getTopicConfig() { TopicConfig tConfig = new TopicConfig(); - //tConfig.setName(getName()); + tConfig.setId(getId()); + tConfig.setName(getName()); tConfig.setLogin(getLogin()); tConfig.setEnabled(isEnabled()); tConfig.setDataFormat(dataFormat); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java index 0b6c54c3..eff87114 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java @@ -33,6 +33,7 @@ import lombok.Setter; @Getter @Setter public class DbConfig { + private int id; private String name; private String host; private boolean enabled; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java index 1fffa7ec..ace7bfa9 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java @@ -41,6 +41,7 @@ import org.onap.datalake.feeder.enumeration.DataFormat; public class TopicConfig { + private int id; private String name; private String login; private String password; @@ -54,79 +55,7 @@ public class TopicConfig { private String messageIdPath; private String aggregateArrayPath; private String flattenArrayPath; - - public DataFormat getDataFormat2() { - if (dataFormat != null) { - return DataFormat.fromString(dataFormat); - } else { - return null; - } - } - - public boolean supportHdfs() { - return supportDb("HDFS"); - } - - public boolean supportElasticsearch() { - return supportDb("Elasticsearch");//TODO string hard codes - } - - public boolean supportCouchbase() { - return supportDb("Couchbase"); - } - - public boolean supportDruid() { - return supportDb("Druid"); - } - - public boolean supportMongoDB() { - return supportDb("MongoDB"); - } - - private boolean supportDb(String dbName) { - return (enabledSinkdbs != null && enabledSinkdbs.contains(dbName)); - } - - //extract DB id from JSON attributes, support multiple attributes - public String getMessageId(JSONObject json) { - String id = null; - - if (StringUtils.isNotBlank(messageIdPath)) { - String[] paths = messageIdPath.split(","); - - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < paths.length; i++) { - if (i > 0) { - sb.append('^'); - } - sb.append(json.query(paths[i]).toString()); - } - id = sb.toString(); - } - - return id; - } - - public String[] getAggregateArrayPath2() { - String[] ret = null; - - if (StringUtils.isNotBlank(aggregateArrayPath)) { - ret = aggregateArrayPath.split(","); - } - - return ret; - } - - public String[] getFlattenArrayPath2() { - String[] ret = null; - - if (StringUtils.isNotBlank(flattenArrayPath)) { - ret = flattenArrayPath.split(","); - } - - return ret; - } - + @Override public String toString() { return String.format("TopicConfig %s(enabled=%s, enabledSinkdbs=%s)", name, enabled, enabledSinkdbs); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java index 9b1eb23b..05d76d55 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java @@ -26,7 +26,7 @@ package org.onap.datalake.feeder.enumeration; * */ public enum DbTypeEnum { - CB("Couchbase"), DRUID("Druid"), ES("Elasticsearch"), HDFS("HDFS"), MONGO("MongoDB"), KIBANA("Kibana"); + CB("Couchbase"), DRUID("Druid"), ES("Elasticsearch"), HDFS("HDFS"), MONGO("MongoDB"), KIBANA("Kibana"), SUPERSET("Superset"); private final String name; @@ -34,12 +34,4 @@ public enum DbTypeEnum { this.name = name; } - public static DbTypeEnum fromString(String s) { - for (DbTypeEnum df : DbTypeEnum.values()) { - if (df.name.equalsIgnoreCase(s)) { - return df; - } - } - throw new IllegalArgumentException("Invalid value for db: " + s); - } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java new file mode 100644 index 00000000..157fbf94 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java @@ -0,0 +1,38 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2018 TechMahindra +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.enumeration; + +/** + * Design type + * + * @author Guobiao Mo + * + */ +public enum DesignTypeEnum { + KIBANA_DB("Kibana Dashboard"), KIBANA_SEARCH("Kibana Search"), KIBANA_VISUAL("Kibana Visualization"), + ES_MAPPING("Elasticsearch Field Mapping Template"), DRUID_KAFKA_SPEC("Druid Kafka Indexing Service Supervisor Spec"); + + private final String name; + + DesignTypeEnum(String name) { + this.name = name; + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java new file mode 100644 index 00000000..9f8ea8a9 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java @@ -0,0 +1,35 @@ +/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.repository;
+
+import org.onap.datalake.feeder.domain.TopicName;
+import org.springframework.data.repository.CrudRepository;
+
+/**
+ *
+ * TopicName Repository
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+public interface TopicNameRepository extends CrudRepository<TopicName, String> {
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java index 182bf6f1..b4dd6374 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java @@ -19,6 +19,9 @@ */
package org.onap.datalake.feeder.repository;
+import java.util.List;
+
+import org.onap.datalake.feeder.domain.Portal;
import org.onap.datalake.feeder.domain.Topic;
import org.springframework.data.repository.CrudRepository;
@@ -32,5 +35,5 @@ import org.springframework.data.repository.CrudRepository; */
public interface TopicRepository extends CrudRepository<Topic, Integer> {
-
+ //List<Topic> findByTopicName(String topicStr);
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java index 6d6fb750..2e934e2e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java @@ -20,9 +20,6 @@ package org.onap.datalake.feeder.service; -import java.util.Optional; - -import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.repository.DbRepository; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -38,29 +35,4 @@ public class DbService { @Autowired private DbRepository dbRepository; - - public Db getDb(String name) { - return dbRepository.findByName(name); - } - - public Db getCouchbase() { - return getDb("Couchbase"); - } - - public Db getElasticsearch() { - return getDb("Elasticsearch"); - } - - public Db getMongoDB() { - return getDb("MongoDB"); - } - - public Db getDruid() { - return getDb("Druid"); - } - - public Db getHdfs() { - return getDb("HDFS"); - } - } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java index 5c544d6c..1bfd437f 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java @@ -24,7 +24,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import javax.annotation.PostConstruct; @@ -35,6 +37,8 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Kafka; import org.onap.datalake.feeder.dto.TopicConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +64,12 @@ public class DmaapService { private ZooKeeper zk; + private Kafka kafka; + + public DmaapService(Kafka kafka) { + this.kafka = kafka; + } + @PreDestroy public void cleanUp() throws InterruptedException { config.getShutdownLock().readLock().lock(); @@ -76,7 +86,7 @@ public class DmaapService { @PostConstruct private void init() throws IOException, InterruptedException { - zk = connect(config.getDmaapZookeeperHostPort()); + zk = connect(kafka.getZooKeeper()); } //get all topic names from Zookeeper @@ -84,11 +94,11 @@ public class DmaapService { public List<String> getTopics() { try { if (zk == null) { - zk = connect(config.getDmaapZookeeperHostPort()); + zk = connect(kafka.getZooKeeper()); } - log.info("connecting to ZooKeeper {} for a list of topics.", config.getDmaapZookeeperHostPort()); + log.info("connecting to ZooKeeper {} for a list of topics.", kafka.getZooKeeper()); List<String> topics = zk.getChildren("/brokers/topics", false); - String[] excludes = config.getDmaapKafkaExclude(); + String[] excludes = kafka.getExcludedTopic().split(","); topics.removeAll(Arrays.asList(excludes)); log.info("list of topics: {}", topics); return topics; @@ -100,7 +110,7 @@ public class DmaapService { } private ZooKeeper connect(String host) throws IOException, InterruptedException { - log.info("connecting to ZooKeeper {} ...", config.getDmaapZookeeperHostPort()); + log.info("connecting to ZooKeeper {} ...", kafka.getZooKeeper()); CountDownLatch connectedSignal = new CountDownLatch(1); ZooKeeper ret = new ZooKeeper(host, 10000, new Watcher() { public void process(WatchedEvent we) { @@ -126,18 +136,18 @@ public class DmaapService { return ret; } */ - public List<TopicConfig> getActiveTopicConfigs() throws IOException { + public Map<String, List<EffectiveTopic>> getActiveEffectiveTopic() throws IOException { log.debug("entering getActiveTopicConfigs()..."); - List<String> allTopics = getTopics(); + List<String> allTopics = getTopics(); //topics in Kafka cluster TODO update table topic_name with new topics - List<TopicConfig> ret = new ArrayList<>(allTopics.size()); + Map<String, List<EffectiveTopic>> ret = new HashMap<>(); for (String topicStr : allTopics) { log.debug("get topic setting from DB: {}.", topicStr); - TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true); - if (topicConfig.isEnabled()) { - ret.add(topicConfig); - } + List<EffectiveTopic> effectiveTopics= topicService.getEnabledEffectiveTopic(kafka, topicStr, true); + + ret.put(topicStr , effectiveTopics); + } return ret; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PortalDesignService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PortalDesignService.java index df701e88..408e4971 100755 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PortalDesignService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PortalDesignService.java @@ -23,15 +23,27 @@ package org.onap.datalake.feeder.service; import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.DbType;
import org.onap.datalake.feeder.domain.DesignType;
import org.onap.datalake.feeder.domain.Portal;
import org.onap.datalake.feeder.domain.PortalDesign;
import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.domain.TopicName;
import org.onap.datalake.feeder.dto.PortalDesignConfig;
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+import org.onap.datalake.feeder.enumeration.DesignTypeEnum;
import org.onap.datalake.feeder.repository.DesignTypeRepository;
import org.onap.datalake.feeder.repository.PortalDesignRepository;
+import org.onap.datalake.feeder.repository.TopicNameRepository;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
import org.onap.datalake.feeder.util.HttpClientUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,11 +63,11 @@ public class PortalDesignService { static String POST_FLAG;
- @Autowired
- private PortalDesignRepository portalDesignRepository;
+ @Autowired
+ private PortalDesignRepository portalDesignRepository;
- @Autowired
- private TopicService topicService;
+ @Autowired
+ private TopicNameRepository topicNameRepository;
@Autowired
private DesignTypeRepository designTypeRepository;
@@ -63,17 +75,13 @@ public class PortalDesignService { @Autowired
private ApplicationConfiguration applicationConfiguration;
- @Autowired
- private DbService dbService;
-
- public PortalDesign fillPortalDesignConfiguration(PortalDesignConfig portalDesignConfig) throws Exception
- {
+ public PortalDesign fillPortalDesignConfiguration(PortalDesignConfig portalDesignConfig) throws Exception {
PortalDesign portalDesign = new PortalDesign();
fillPortalDesign(portalDesignConfig, portalDesign);
return portalDesign;
}
- public void fillPortalDesignConfiguration(PortalDesignConfig portalDesignConfig, PortalDesign portalDesign) throws Exception
- {
+
+ public void fillPortalDesignConfiguration(PortalDesignConfig portalDesignConfig, PortalDesign portalDesign) throws Exception {
fillPortalDesign(portalDesignConfig, portalDesign);
}
@@ -86,32 +94,34 @@ public class PortalDesignService { portalDesign.setSubmitted(portalDesignConfig.getSubmitted());
if (portalDesignConfig.getTopic() != null) {
- Topic topic = topicService.getTopic(portalDesignConfig.getTopic());
- if (topic == null) throw new IllegalArgumentException("topic is null");
- portalDesign.setTopicName(topic.getTopicName());
- }else {
- throw new IllegalArgumentException("Can not find topic in DB, topic name: "+portalDesignConfig.getTopic());
+ Optional<TopicName> topicName = topicNameRepository.findById(portalDesignConfig.getTopic());
+ if (topicName.isPresent()) {
+ portalDesign.setTopicName(topicName.get());
+ } else {
+ throw new IllegalArgumentException("topic is null " + portalDesignConfig.getTopic());
+ }
+ } else {
+ throw new IllegalArgumentException("Can not find topic in DB, topic name: " + portalDesignConfig.getTopic());
}
if (portalDesignConfig.getDesignType() != null) {
DesignType designType = designTypeRepository.findById(portalDesignConfig.getDesignType()).get();
- if (designType == null) throw new IllegalArgumentException("designType is null");
+ if (designType == null)
+ throw new IllegalArgumentException("designType is null");
portalDesign.setDesignType(designType);
- }else {
- throw new IllegalArgumentException("Can not find designType in Design_type, designType name "+portalDesignConfig.getDesignType());
+ } else {
+ throw new IllegalArgumentException("Can not find designType in Design_type, designType name " + portalDesignConfig.getDesignType());
}
}
-
public PortalDesign getPortalDesign(Integer id) {
-
+
Optional<PortalDesign> ret = portalDesignRepository.findById(id);
return ret.isPresent() ? ret.get() : null;
}
-
- public List<PortalDesignConfig> queryAllPortalDesign(){
+ public List<PortalDesignConfig> queryAllPortalDesign() {
List<PortalDesign> portalDesignList = null;
List<PortalDesignConfig> portalDesignConfigList = new ArrayList<>();
@@ -125,30 +135,21 @@ public class PortalDesignService { return portalDesignConfigList;
}
-
- public boolean deploy(PortalDesign portalDesign){
- boolean flag =true;
- String designTypeName = portalDesign.getDesignType().getName();
- if (portalDesign.getDesignType() != null && "kibana_db".equals(designTypeName)) {
- flag = deployKibanaImport(portalDesign);
- } else if (portalDesign.getDesignType() != null && "kibana_visual".equals(designTypeName)) {
- //TODO
- flag =false;
- } else if (portalDesign.getDesignType() != null && "kibana_search".equals(designTypeName)) {
- //TODO
- flag = false;
- } else if (portalDesign.getDesignType() != null && "es_mapping".equals(designTypeName)) {
- flag = postEsMappingTemplate(portalDesign, portalDesign.getTopicName().getId().toLowerCase());
- } else if (portalDesign.getDesignType() != null && "druid_kafka_spec".equals(designTypeName)) {
- //TODO
- flag =false;
- } else {
- flag =false;
+ public boolean deploy(PortalDesign portalDesign) {
+ DesignType designType = portalDesign.getDesignType();
+ DesignTypeEnum designTypeEnum = DesignTypeEnum.valueOf(designType.getId());
+
+ switch (designTypeEnum) {
+ case KIBANA_DB:
+ return deployKibanaImport(portalDesign);
+ case ES_MAPPING:
+ return postEsMappingTemplate(portalDesign, portalDesign.getTopicName().getId().toLowerCase());
+ default:
+ log.error("Not implemented {}", designTypeEnum);
+ return false;
}
- return flag;
}
-
private boolean deployKibanaImport(PortalDesign portalDesign) throws RuntimeException {
POST_FLAG = "KibanaDashboardImport";
String requestBody = portalDesign.getBody();
@@ -168,20 +169,16 @@ public class PortalDesignService { }
-
- private String kibanaImportUrl(String host, Integer port){
+ private String kibanaImportUrl(String host, Integer port) {
if (port == null) {
port = applicationConfiguration.getKibanaPort();
}
- return "http://"+host+":"+port+applicationConfiguration.getKibanaDashboardImportApi();
+ return "http://" + host + ":" + port + applicationConfiguration.getKibanaDashboardImportApi();
}
-
/**
- * successed resp:
- * {
- * "acknowledged": true
- * }
+ * successed resp: { "acknowledged": true }
+ *
* @param portalDesign
* @param templateName
* @return flag
@@ -189,7 +186,13 @@ public class PortalDesignService { public boolean postEsMappingTemplate(PortalDesign portalDesign, String templateName) throws RuntimeException {
POST_FLAG = "ElasticsearchMappingTemplate";
String requestBody = portalDesign.getBody();
- return HttpClientUtil.sendPostHttpClient("http://"+dbService.getElasticsearch().getHost()+":9200/_template/"+templateName, requestBody, POST_FLAG);
+
+ //FIXME
+ Set<Db> dbs = portalDesign.getDbs();
+ //submit to each ES in dbs
+
+ //return HttpClientUtil.sendPostHttpClient("http://"+dbService.getElasticsearch().getHost()+":9200/_template/"+templateName, requestBody, POST_FLAG);
+ return false;
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java index dc04cf60..65de0bdc 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java @@ -50,7 +50,7 @@ public class PullService { private boolean isRunning = false; private ExecutorService executorService; - private Thread topicConfigPollingThread; +// private Thread topicConfigPollingThread; private Set<Puller> pullers; @Autowired @@ -94,10 +94,11 @@ public class PullService { } } - topicConfigPollingThread = new Thread(topicConfigPollingService); + executorService.submit(topicConfigPollingService); + /*topicConfigPollingThread = new Thread(topicConfigPollingService); topicConfigPollingThread.setName("TopicConfigPolling"); topicConfigPollingThread.start(); - +*/ isRunning = true; Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); @@ -126,11 +127,12 @@ public class PullService { puller.shutdown(); } - logger.info("stop TopicConfigPollingService ..."); - topicConfigPollingService.shutdown(); +// logger.info("stop TopicConfigPollingService ..."); +// topicConfigPollingService.shutdown(); - topicConfigPollingThread.join(); + // topicConfigPollingThread.join(); + logger.info("stop executorService ..."); executorService.shutdown(); executorService.awaitTermination(120L, TimeUnit.SECONDS); } catch (InterruptedException e) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java index 5cc3b55d..1550e531 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java @@ -29,7 +29,6 @@ import java.util.Properties; import javax.annotation.PostConstruct; -import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -54,7 +53,6 @@ import org.springframework.stereotype.Service; */ @Service -//@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class Puller implements Runnable { @Autowired @@ -75,6 +73,9 @@ public class Puller implements Runnable { private Kafka kafka; + public Puller( ) { + + } public Puller(Kafka kafka) { this.kafka = kafka; } @@ -84,11 +85,11 @@ public class Puller implements Runnable { async = config.isAsync(); } - private Properties getConsumerConfig() {//00 + private Properties getConsumerConfig() { Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup()); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBrokerList()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, kafka.getGroup()); consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(Thread.currentThread().getId())); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); @@ -96,10 +97,10 @@ public class Puller implements Runnable { consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor"); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - if (StringUtils.isNotBlank(config.getDmaapKafkaLogin())) { - String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + config.getDmaapKafkaLogin() + " password=" + config.getDmaapKafkaPass() + " serviceName=kafka;"; + if (kafka.isSecure()) { + String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + kafka.getLogin() + " password=" + kafka.getPass() + " serviceName=kafka;"; consumerConfig.put("sasl.jaas.config", jaas); - consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.getDmaapKafkaSecurityProtocol()); + consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafka.getSecurityProtocol()); consumerConfig.put("sasl.mechanism", "PLAIN"); } return consumerConfig; @@ -120,8 +121,8 @@ public class Puller implements Runnable { try { while (active) { - if (topicConfigPollingService.isActiveTopicsChanged(true)) {//true means update local version as well - List<String> topics = topicConfigPollingService.getActiveTopics(kafka);//00 + if (topicConfigPollingService.isActiveTopicsChanged(kafka)) { + Collection<String> topics = topicConfigPollingService.getActiveTopics(kafka); log.info("Active Topic list is changed, subscribe to the latest topics: {}", topics); consumer.subscribe(topics, rebalanceListener); } @@ -141,7 +142,7 @@ public class Puller implements Runnable { KafkaConsumer<String, String> consumer = consumerLocal.get(); log.debug("pulling..."); - ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout())); + ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(kafka.getTimeout())); log.debug("done pulling."); if (records != null && records.count() > 0) { @@ -153,7 +154,7 @@ public class Puller implements Runnable { messages.add(Pair.of(record.timestamp(), record.value())); //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); } - storeService.saveMessages(kafka, partition.topic(), messages);//00 + storeService.saveMessages(kafka, partition.topic(), messages); log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index 291f1cad..f5a7698d 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -22,7 +22,9 @@ package org.onap.datalake.feeder.service; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Set; import javax.annotation.PostConstruct; @@ -32,13 +34,23 @@ import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; import org.json.XML; import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.DbType; +import org.onap.datalake.feeder.domain.EffectiveTopic; import org.onap.datalake.feeder.domain.Kafka; import org.onap.datalake.feeder.dto.TopicConfig; import org.onap.datalake.feeder.enumeration.DataFormat; +import org.onap.datalake.feeder.enumeration.DbTypeEnum; +import org.onap.datalake.feeder.service.db.CouchbaseService; +import org.onap.datalake.feeder.service.db.DbStoreService; +import org.onap.datalake.feeder.service.db.ElasticsearchService; +import org.onap.datalake.feeder.service.db.HdfsService; +import org.onap.datalake.feeder.service.db.MongodbService; import org.onap.datalake.feeder.util.JsonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; import com.fasterxml.jackson.databind.ObjectMapper; @@ -61,19 +73,10 @@ public class StoreService { private ApplicationConfiguration config; @Autowired - private TopicConfigPollingService configPollingService; - - @Autowired - private MongodbService mongodbService; + private ApplicationContext context; @Autowired - private CouchbaseService couchbaseService; - - @Autowired - private ElasticsearchService elasticsearchService; - - @Autowired - private HdfsService hdfsService; + private TopicConfigPollingService configPollingService; private ObjectMapper yamlReader; @@ -87,23 +90,41 @@ public class StoreService { return; } - TopicConfig topicConfig = configPollingService.getEffectiveTopicConfig(topicStr); + Collection<EffectiveTopic> effectiveTopics = configPollingService.getEffectiveTopic(kafka, topicStr); + for(EffectiveTopic effectiveTopic:effectiveTopics) { + saveMessagesForTopic(effectiveTopic, messages); + } + } + + private void saveMessagesForTopic(EffectiveTopic effectiveTopic, List<Pair<Long, String>> messages) { + if (!effectiveTopic.getTopic().isEnabled()) { + log.error("we should not come here {}", effectiveTopic); + return; + } List<JSONObject> docs = new ArrayList<>(); for (Pair<Long, String> pair : messages) { try { - docs.add(messageToJson(topicConfig, pair)); + docs.add(messageToJson(effectiveTopic, pair)); } catch (Exception e) { //may see org.json.JSONException. log.error("Error when converting this message to JSON: " + pair.getRight(), e); } } - saveJsons(topicConfig, docs, messages); + Set<Db> dbs = effectiveTopic.getTopic().getDbs(); + + for (Db db : dbs) { + if (db.getDbType().isTool() || !db.isEnabled()) { + continue; + } + DbStoreService dbStoreService = findDbStoreService(db); + dbStoreService.saveJsons(effectiveTopic, docs); + } } - private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException { + private JSONObject messageToJson(EffectiveTopic effectiveTopic, Pair<Long, String> pair) throws IOException { long timestamp = pair.getLeft(); String text = pair.getRight(); @@ -114,11 +135,11 @@ public class StoreService { // log.debug("{} ={}", topicStr, text); //} - boolean storeRaw = topicConfig.isSaveRaw(); + boolean storeRaw = effectiveTopic.getTopic().isSaveRaw(); JSONObject json = null; - DataFormat dataFormat = topicConfig.getDataFormat2(); + DataFormat dataFormat = effectiveTopic.getTopic().getDataFormat2(); switch (dataFormat) { case JSON: @@ -149,15 +170,15 @@ public class StoreService { json.put(config.getRawDataLabel(), text); } - if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) { - String[] paths = topicConfig.getAggregateArrayPath2(); + if (StringUtils.isNotBlank(effectiveTopic.getTopic().getAggregateArrayPath())) { + String[] paths = effectiveTopic.getTopic().getAggregateArrayPath2(); for (String path : paths) { JsonUtil.arrayAggregate(path, json); } } - if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) { - String[] paths = topicConfig.getFlattenArrayPath2(); + if (StringUtils.isNotBlank(effectiveTopic.getTopic().getFlattenArrayPath())) { + String[] paths = effectiveTopic.getTopic().getFlattenArrayPath2(); for (String path : paths) { JsonUtil.flattenArray(path, json); } @@ -166,29 +187,29 @@ public class StoreService { return json; } - private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) { - if (topic.supportMongoDB()) { - mongodbService.saveJsons(topic, jsons); - } - - if (topic.supportCouchbase()) { - couchbaseService.saveJsons(topic, jsons); - } - - if (topic.supportElasticsearch()) { - elasticsearchService.saveJsons(topic, jsons); - } - - if (topic.supportHdfs()) { - hdfsService.saveMessages(topic, messages); + private DbStoreService findDbStoreService(Db db) { + DbType dbType = db.getDbType(); + DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId()); + switch (dbTypeEnum) { + case CB: + return context.getBean(CouchbaseService.class, db); + case ES: + return context.getBean(ElasticsearchService.class, db); + case HDFS: + return context.getBean(HdfsService.class, db); + case MONGO: + return context.getBean(MongodbService.class, db); + default: + log.error("we should not come here {}", dbTypeEnum); + return null; } } public void flush() { //force flush all buffer - hdfsService.flush(); +// hdfsService.flush(); } public void flushStall() { //flush stall buffer - hdfsService.flushStall(); + // hdfsService.flushStall(); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java index 453b3ee9..8f703b1d 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java @@ -21,20 +21,23 @@ package org.onap.datalake.feeder.service; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.PostConstruct; import org.apache.commons.collections.CollectionUtils; import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.EffectiveTopic; import org.onap.datalake.feeder.domain.Kafka; -import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.repository.KafkaRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; /** @@ -52,45 +55,56 @@ public class TopicConfigPollingService implements Runnable { ApplicationConfiguration config; @Autowired - private DmaapService dmaapService; + private ApplicationContext context; - //effective TopicConfig Map - private Map<String, TopicConfig> effectiveTopicConfigMap = new HashMap<>(); + @Autowired + private KafkaRepository kafkaRepository; + + //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic. + private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();; + //private Map<String, TopicConfig> effectiveTopicConfigMap; //monitor Kafka topic list changes - private List<String> activeTopics; - private ThreadLocal<Integer> activeTopicsVersionLocal = ThreadLocal.withInitial(() -> -1); - private int currentActiveTopicsVersion = -1; + private Map<String, Set<String>> activeTopicMap; + + private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal = new ThreadLocal<>(); + private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>(); private boolean active = false; @PostConstruct private void init() { try { - log.info("init(), ccalling poll()..."); - activeTopics = poll(); - currentActiveTopicsVersion++; + log.info("init(), calling poll()..."); + activeTopicMap = poll(); } catch (Exception ex) { log.error("error connection to HDFS.", ex); } } - public boolean isActiveTopicsChanged(boolean update) {//update=true means sync local version - boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get(); - log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get()); - if (changed && update) { - activeTopicsVersionLocal.set(currentActiveTopicsVersion); + public boolean isActiveTopicsChanged(Kafka kafka) {//update=true means sync local version + String kafkaId = kafka.getId(); + int currentActiveTopicsVersion = currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1);//init did one version + int localActiveTopicsVersion = activeTopicsVersionLocal.get().getOrDefault(kafkaId, 0); + + boolean changed = currentActiveTopicsVersion > localActiveTopicsVersion; + log.debug("kafkaId={} isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", kafkaId, changed, currentActiveTopicsVersion, localActiveTopicsVersion); + if (changed) { + activeTopicsVersionLocal.get().put(kafkaId, currentActiveTopicsVersion); } return changed; } - public List<String> getActiveTopics(Kafka kafka) { - return activeTopics; + //get a list of topic names to monitor + public Collection<String> getActiveTopics(Kafka kafka) { + return activeTopicMap.get(kafka.getId()); } - public TopicConfig getEffectiveTopicConfig(String topicStr) { - return effectiveTopicConfigMap.get(topicStr); + //get the EffectiveTopics given kafka and topic name + public Collection<EffectiveTopic> getEffectiveTopic(Kafka kafka, String topicStr) { + Map<String, List<EffectiveTopic>> effectiveTopicMapKafka= effectiveTopicMap.get(kafka.getId()); + return effectiveTopicMapKafka.get(topicStr); } @Override @@ -100,7 +114,7 @@ public class TopicConfigPollingService implements Runnable { while (active) { try { //sleep first since we already pool in init() - Thread.sleep(config.getDmaapCheckNewTopicInterval()); + Thread.sleep(config.getCheckTopicInterval()); if(!active) { break; } @@ -110,15 +124,23 @@ public class TopicConfigPollingService implements Runnable { } try { - List<String> newTopics = poll(); - if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) { - log.info("activeTopics list is updated, old={}", activeTopics); - log.info("activeTopics list is updated, new={}", newTopics); - - activeTopics = newTopics; - currentActiveTopicsVersion++; - } else { - log.debug("activeTopics list is not updated."); + Map<String, Set<String>> newTopicsMap = poll(); + + for(Map.Entry<String, Set<String>> entry:newTopicsMap.entrySet()) { + String kafkaId = entry.getKey(); + Set<String> newTopics = entry.getValue(); + + Set<String> activeTopics = activeTopicMap.get(kafkaId); + + if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) { + log.info("activeTopics list is updated, old={}", activeTopics); + log.info("activeTopics list is updated, new={}", newTopics); + + activeTopicMap.put(kafkaId, newTopics); + currentActiveTopicsVersionMap.put(kafkaId, currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1)+1); + } else { + log.debug("activeTopics list is not updated."); + } } } catch (IOException e) { log.error("dmaapService.getActiveTopics()", e); @@ -132,17 +154,27 @@ public class TopicConfigPollingService implements Runnable { active = false; } - private List<String> poll() throws IOException { + private Map<String, Set<String>> poll() throws IOException { + Map<String, Set<String>> ret = new HashMap<>(); + Iterable<Kafka> kafkas = kafkaRepository.findAll(); + for (Kafka kafka : kafkas) { + if (kafka.isEnabled()) { + Set<String> topics = poll(kafka); + ret.put(kafka.getId(), topics); + } + } + return ret; + } + + private Set<String> poll(Kafka kafka) throws IOException { log.debug("poll(), use dmaapService to getActiveTopicConfigs..."); - List<TopicConfig> activeTopicConfigs = dmaapService.getActiveTopicConfigs(); - Map<String, TopicConfig> tempEffectiveTopicConfigMap = new HashMap<>(); - activeTopicConfigs.stream().forEach(topicConfig -> tempEffectiveTopicConfigMap.put(topicConfig.getName(), topicConfig)); - effectiveTopicConfigMap = tempEffectiveTopicConfigMap; - log.debug("poll(), effectiveTopicConfigMap={}", effectiveTopicConfigMap); + DmaapService dmaapService = context.getBean(DmaapService.class, kafka); + + Map<String, List<EffectiveTopic>> activeEffectiveTopics = dmaapService.getActiveEffectiveTopic(); + effectiveTopicMap.put(kafka.getId(), activeEffectiveTopics); - List<String> ret = new ArrayList<>(activeTopicConfigs.size()); - activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName())); + Set<String> ret = activeEffectiveTopics.keySet(); return ret; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index dd8664e8..86b27a9a 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -21,23 +21,31 @@ package org.onap.datalake.feeder.service; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.Set; +import org.apache.commons.collections.CollectionUtils; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.dto.TopicConfig; import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Kafka; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.repository.DbRepository; +import org.onap.datalake.feeder.repository.TopicNameRepository; import org.onap.datalake.feeder.repository.TopicRepository; +import org.onap.datalake.feeder.service.db.ElasticsearchService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; /** - * Service for topics + * Service for topics * * @author Guobiao Mo * @@ -49,72 +57,90 @@ public class TopicService { @Autowired private ApplicationConfiguration config; - + @Autowired - private TopicRepository topicRepository; + private ApplicationContext context; @Autowired - private ElasticsearchService elasticsearchService; + private TopicNameRepository topicNameRepository; + @Autowired + private TopicRepository topicRepository; @Autowired private DbRepository dbRepository; - public TopicConfig getEffectiveTopic(String topicStr) { - try { - return getEffectiveTopic(topicStr, false); - } catch (IOException e) { - log.error(topicStr, e); + public List<EffectiveTopic> getEnabledEffectiveTopic(Kafka kafka, String topicStr, boolean ensureTableExist) throws IOException { + + List<Topic> topics = findTopics(kafka, topicStr); + if (CollectionUtils.isEmpty(topics)) { + topics = new ArrayList<>(); + topics.add(getDefaultTopic(kafka)); } - return null; - } - public TopicConfig getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException { - Topic topic = getTopic(topicStr); - if (topic == null) { - topic = getDefaultTopic(); + List<EffectiveTopic> ret = new ArrayList<>(); + for (Topic topic : topics) { + if (!topic.isEnabled()) { + continue; + } + ret.add(new EffectiveTopic(topic, topicStr)); + + if (ensureTableExist) { + for (Db db : topic.getDbs()) { + if (db.isElasticsearch()) { + ElasticsearchService elasticsearchService = context.getBean(ElasticsearchService.class, db); + elasticsearchService.ensureTableExist(topicStr); + } + } + } } - TopicConfig topicConfig = topic.getTopicConfig(); - topicConfig.setName(topicStr);//need to change name if it comes from DefaultTopic + + return ret; + } + + //TODO use query + public List<Topic> findTopics(Kafka kafka, String topicStr) { + List<Topic> ret = new ArrayList<>(); - if(ensureTableExist && topicConfig.isEnabled() && topicConfig.supportElasticsearch()) { - elasticsearchService.ensureTableExist(topicStr); + Iterable<Topic> allTopics = topicRepository.findAll(); + for(Topic topic: allTopics) { + if(topic.getKafkas().contains(kafka ) && topic.getTopicName().getId().equals(topicStr)){ + ret.add(topic); + } } - return topicConfig; + return ret; } - public Topic getTopic(String topicStr) { - Optional<Topic> ret = topicRepository.findById(null);//FIXME + public Topic getTopic(int topicId) { + Optional<Topic> ret = topicRepository.findById(topicId); return ret.isPresent() ? ret.get() : null; } - public Topic getDefaultTopic() { - return getTopic(config.getDefaultTopicName()); + public Topic getDefaultTopic(Kafka kafka) { + return findTopics(kafka, config.getDefaultTopicName()).get(0); } - public boolean istDefaultTopic(Topic topic) { + public boolean isDefaultTopic(Topic topic) { if (topic == null) { return false; } - return true;//topic.getName().equals(config.getDefaultTopicName()); + return topic.getName().equals(config.getDefaultTopicName()); } - public void fillTopicConfiguration(TopicConfig tConfig, Topic wTopic) - { + public void fillTopicConfiguration(TopicConfig tConfig, Topic wTopic) { fillTopic(tConfig, wTopic); } - public Topic fillTopicConfiguration(TopicConfig tConfig) - { + public Topic fillTopicConfiguration(TopicConfig tConfig) { Topic topic = new Topic(); fillTopic(tConfig, topic); return topic; } - private void fillTopic(TopicConfig tConfig, Topic topic) - { + private void fillTopic(TopicConfig tConfig, Topic topic) { Set<Db> relateDb = new HashSet<>(); - //topic.setName(tConfig.getName()); + topic.setId(tConfig.getId()); + topic.setTopicName(topicNameRepository.findById(tConfig.getName()).get()); topic.setLogin(tConfig.getLogin()); topic.setPass(tConfig.getPassword()); topic.setEnabled(tConfig.isEnabled()); @@ -126,24 +152,21 @@ public class TopicService { topic.setAggregateArrayPath(tConfig.getAggregateArrayPath()); topic.setFlattenArrayPath(tConfig.getFlattenArrayPath()); - if(tConfig.getSinkdbs() != null) { + if (tConfig.getSinkdbs() != null) { for (String item : tConfig.getSinkdbs()) { Db sinkdb = dbRepository.findByName(item); if (sinkdb != null) { relateDb.add(sinkdb); } } - if(relateDb.size() > 0) + if (relateDb.size() > 0) topic.setDbs(relateDb); - else if(relateDb.size() == 0) - { + else if (relateDb.size() == 0) { topic.getDbs().clear(); } - }else - { + } else { topic.setDbs(relateDb); } - } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java index fc31b2eb..33c8847e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.datalake.feeder.service; +package org.onap.datalake.feeder.service.db; import java.util.ArrayList; import java.util.List; @@ -30,7 +30,8 @@ import javax.annotation.PreDestroy; import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -55,25 +56,33 @@ import rx.functions.Func1; * */ @Service -public class CouchbaseService { +public class CouchbaseService implements DbStoreService { private final Logger log = LoggerFactory.getLogger(this.getClass()); @Autowired ApplicationConfiguration config; - + + private Db couchbase; +/* @Autowired private DbService dbService; - Bucket bucket; private boolean isReady = false; +*/ + Bucket bucket; + public CouchbaseService( ) { + + } + public CouchbaseService(Db db) { + couchbase = db; + } + @PostConstruct private void init() { // Initialize Couchbase Connection try { - Db couchbase = dbService.getCouchbase(); - //this tunes the SDK (to customize connection timeout) CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder().connectTimeout(60000) // 60s, default is 5s .build(); @@ -84,10 +93,10 @@ public class CouchbaseService { bucket.bucketManager().createN1qlPrimaryIndex(true, false); log.info("Connected to Couchbase {} as {}", couchbase.getHost(), couchbase.getLogin()); - isReady = true; +// isReady = true; } catch (Exception ex) { log.error("error connection to Couchbase.", ex); - isReady = false; + // isReady = false; } } @@ -103,7 +112,8 @@ public class CouchbaseService { } } - public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { + @Override + public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) { List<JsonDocument> documents = new ArrayList<>(jsons.size()); for (JSONObject json : jsons) { //convert to Couchbase JsonObject from org.json JSONObject @@ -112,9 +122,9 @@ public class CouchbaseService { long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson() //setup TTL - int expiry = (int) (timestamp / 1000L) + topic.getTtl() * 3600 * 24; //in second + int expiry = (int) (timestamp / 1000L) + effectiveTopic.getTopic().getTtl() * 3600 * 24; //in second - String id = getId(topic, json); + String id = getId(effectiveTopic.getTopic(), json); JsonDocument doc = JsonDocument.create(id, expiry, jsonObject); documents.add(doc); } @@ -133,10 +143,10 @@ public class CouchbaseService { } catch (Exception e) { log.error("error saving to Couchbase.", e); } - log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); + log.debug("saved text to topic = {}, this batch count = {} ", effectiveTopic, documents.size()); } - public String getId(TopicConfig topic, JSONObject json) { + public String getId(Topic topic, JSONObject json) { //if this topic requires extract id from JSON String id = topic.getMessageId(json); if (id != null) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java new file mode 100644 index 00000000..5ea6e23e --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java @@ -0,0 +1,37 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2018 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service.db; + +import java.util.List; + +import org.json.JSONObject; +import org.onap.datalake.feeder.domain.EffectiveTopic; + +/** + * Interface for all db store services + * + * @author Guobiao Mo + * + */ +public interface DbStoreService { + + void saveJsons(EffectiveTopic topic, List<JSONObject> jsons); +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java index b40f544c..aee63ed7 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.datalake.feeder.service; +package org.onap.datalake.feeder.service.db; import java.io.IOException; import java.util.List; @@ -47,7 +47,8 @@ import org.elasticsearch.rest.RestStatus; import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,24 +62,33 @@ import org.springframework.stereotype.Service; * */ @Service -public class ElasticsearchService { +public class ElasticsearchService implements DbStoreService { private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private Db elasticsearch; @Autowired private ApplicationConfiguration config; - @Autowired - private DbService dbService; + //@Autowired +// private DbService dbService; private RestHighLevelClient client; ActionListener<BulkResponse> listener; + + public ElasticsearchService( ) { + + } + public ElasticsearchService(Db db) { + elasticsearch = db; + } //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html @PostConstruct private void init() { - Db elasticsearch = dbService.getElasticsearch(); + //Db elasticsearch = dbService.getElasticsearch(); String elasticsearchHost = elasticsearch.getHost(); // Initialize the Connection @@ -130,24 +140,25 @@ public class ElasticsearchService { } //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME - public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { + @Override + public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) { BulkRequest request = new BulkRequest(); for (JSONObject json : jsons) { - if (topic.isCorrelateClearedMessage()) { - boolean found = correlateClearedMessage(topic, json); + if (effectiveTopic.getTopic().isCorrelateClearedMessage()) { + boolean found = correlateClearedMessage(effectiveTopic.getTopic(), json); if (found) { continue; } } - String id = topic.getMessageId(json); //id can be null + String id = effectiveTopic.getTopic().getMessageId(json); //id can be null - request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON)); + request.add(new IndexRequest(effectiveTopic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON)); } - log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size()); + log.debug("saving text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size()); if (config.isAsync()) { client.bulkAsync(request, RequestOptions.DEFAULT, listener); @@ -158,7 +169,7 @@ public class ElasticsearchService { log.debug(bulkResponse.buildFailureMessage()); } } catch (IOException e) { - log.error(topic.getName(), e); + log.error(effectiveTopic.getName(), e); } } @@ -175,7 +186,7 @@ public class ElasticsearchService { * source. So use the get API, three parameters: index, type, document * id */ - private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) { + private boolean correlateClearedMessage(Topic topic, JSONObject json) { boolean found = false; String eName = null; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java index d92d05ac..0e107fdf 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.datalake.feeder.service; +package org.onap.datalake.feeder.service.db; import java.io.IOException; import java.net.InetAddress; @@ -38,9 +38,10 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.ShutdownHookManager; +import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.domain.EffectiveTopic; import org.onap.datalake.feeder.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,16 +59,15 @@ import lombok.Setter; * */ @Service -public class HdfsService { +public class HdfsService implements DbStoreService { private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private Db hdfs; @Autowired ApplicationConfiguration config; - @Autowired - private DbService dbService; - FileSystem fileSystem; private boolean isReady = false; @@ -113,6 +113,14 @@ public class HdfsService { messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used } + public void addData2(List<JSONObject> messages) { + if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer + lastFlush = System.currentTimeMillis(); + } + + messages.stream().forEach(message -> data.add(message.toString())); + } + private void saveMessages(String topic, List<String> bufferList) throws IOException { long thread = Thread.currentThread().getId(); @@ -144,12 +152,17 @@ public class HdfsService { } } + public HdfsService( ) { + } + + public HdfsService(Db db) { + hdfs = db; + } + @PostConstruct private void init() { // Initialize HDFS Connection try { - Db hdfs = dbService.getHdfs(); - //Get configuration of Hadoop system Configuration hdfsConfig = new Configuration(); @@ -200,7 +213,8 @@ public class HdfsService { bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic)); } - public void saveMessages(TopicConfig topic, List<Pair<Long, String>> messages) { + //used if raw data should be saved + public void saveMessages(EffectiveTopic topic, List<Pair<Long, String>> messages) { String topicStr = topic.getName(); Map<String, Buffer> bufferMap = bufferLocal.get(); @@ -215,4 +229,21 @@ public class HdfsService { } } + @Override + public void saveJsons(EffectiveTopic topic, List<JSONObject> jsons) { + String topicStr = topic.getName(); + + Map<String, Buffer> bufferMap = bufferLocal.get(); + final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer()); + + buffer.addData2(jsons); + + if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) { + buffer.flush(topicStr); + } else { + log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize()); + } + + } + } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java index f3462e49..0f522f6b 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.datalake.feeder.service; +package org.onap.datalake.feeder.service.db; import java.util.ArrayList; import java.util.HashMap; @@ -34,7 +34,7 @@ import org.bson.Document; import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.domain.EffectiveTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,26 +59,32 @@ import com.mongodb.client.model.InsertManyOptions; * */ @Service -public class MongodbService { +public class MongodbService implements DbStoreService { private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private Db mongodb; @Autowired private ApplicationConfiguration config; private boolean dbReady = false; - @Autowired - private DbService dbService; + //@Autowired +// private DbService dbService; private MongoDatabase database; private MongoClient mongoClient; private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>(); private InsertManyOptions insertManyOptions; + public MongodbService( ) { + } + public MongodbService(Db db) { + mongodb = db; + } + @PostConstruct private void init() { - Db mongodb = dbService.getMongoDB(); - String host = mongodb.getHost(); Integer port = mongodb.getPort(); @@ -141,7 +147,7 @@ public class MongodbService { } } - public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { + public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) { if (dbReady == false)//TOD throw exception return; List<Document> documents = new ArrayList<>(jsons.size()); @@ -149,14 +155,14 @@ public class MongodbService { //convert org.json JSONObject to MongoDB Document Document doc = Document.parse(json.toString()); - String id = topic.getMessageId(json); //id can be null + String id = effectiveTopic.getTopic().getMessageId(json); //id can be null if (id != null) { doc.put("_id", id); } documents.add(doc); } - String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ . + String collectionName = effectiveTopic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ . MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k)); try { @@ -168,7 +174,7 @@ public class MongodbService { } } - log.debug("saved text to topic = {}, batch count = {} ", topic, jsons.size()); + log.debug("saved text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size()); } } |