From b14c5766902d486a94a8db96d2a31ff0e9e8255e Mon Sep 17 00:00:00 2001 From: Guobiao Mo Date: Thu, 27 Jun 2019 18:42:59 -0700 Subject: supports multiple Kafka clusters and DBs Read data from Kafka and store into DBs Issue-ID: DCAEGEN2-1631 Change-Id: Ic2736b6e0497ac2084b1a7ce0da3a6e0e1379f43 Signed-off-by: Guobiao Mo --- .../feeder/config/ApplicationConfiguration.java | 4 +- .../datalake/feeder/controller/DbController.java | 16 +- .../feeder/controller/TopicController.java | 54 +++-- .../java/org/onap/datalake/feeder/domain/Db.java | 30 ++- .../org/onap/datalake/feeder/domain/DbType.java | 6 +- .../datalake/feeder/domain/EffectiveTopic.java | 64 +++++ .../org/onap/datalake/feeder/domain/Kafka.java | 17 +- .../org/onap/datalake/feeder/domain/Topic.java | 127 +++++++--- .../org/onap/datalake/feeder/dto/DbConfig.java | 1 + .../org/onap/datalake/feeder/dto/TopicConfig.java | 75 +----- .../datalake/feeder/enumeration/DbTypeEnum.java | 10 +- .../feeder/enumeration/DesignTypeEnum.java | 38 +++ .../feeder/repository/TopicNameRepository.java | 35 +++ .../feeder/repository/TopicRepository.java | 5 +- .../datalake/feeder/service/CouchbaseService.java | 170 -------------- .../onap/datalake/feeder/service/DbService.java | 28 --- .../onap/datalake/feeder/service/DmaapService.java | 34 ++- .../feeder/service/ElasticsearchService.java | 247 -------------------- .../onap/datalake/feeder/service/HdfsService.java | 218 ----------------- .../datalake/feeder/service/MongodbService.java | 174 -------------- .../feeder/service/PortalDesignService.java | 109 ++++----- .../onap/datalake/feeder/service/PullService.java | 14 +- .../org/onap/datalake/feeder/service/Puller.java | 25 +- .../onap/datalake/feeder/service/StoreService.java | 97 +++++--- .../feeder/service/TopicConfigPollingService.java | 108 ++++++--- .../onap/datalake/feeder/service/TopicService.java | 103 ++++---- .../feeder/service/db/CouchbaseService.java | 180 ++++++++++++++ .../datalake/feeder/service/db/DbStoreService.java | 37 +++ .../feeder/service/db/ElasticsearchService.java | 258 +++++++++++++++++++++ .../datalake/feeder/service/db/HdfsService.java | 249 ++++++++++++++++++++ .../datalake/feeder/service/db/MongodbService.java | 180 ++++++++++++++ 31 files changed, 1509 insertions(+), 1204 deletions(-) create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java delete mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java delete mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java delete mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java delete mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java (limited to 'components/datalake-handler/feeder/src/main') diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index e371af1b..806dc72e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -54,6 +54,8 @@ public class ApplicationConfiguration { private String defaultTopicName; + private int checkTopicInterval; //in millisecond +/* //DMaaP private String dmaapZookeeperHostPort; private String dmaapKafkaHostPort; @@ -68,7 +70,7 @@ public class ApplicationConfiguration { private int dmaapCheckNewTopicInterval; //in millisecond private int kafkaConsumerCount; - +*/ private String elasticsearchType; //HDFS diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java index bd9b742b..322be412 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java @@ -27,8 +27,6 @@ import javax.servlet.http.HttpServletResponse; import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.repository.DbRepository; -import org.onap.datalake.feeder.repository.TopicRepository; -import org.onap.datalake.feeder.service.DbService; import org.onap.datalake.feeder.dto.DbConfig; import org.onap.datalake.feeder.controller.domain.PostReturnBody; import org.slf4j.Logger; @@ -59,12 +57,6 @@ public class DbController { @Autowired private DbRepository dbRepository; - @Autowired - private TopicRepository topicRepository; - - @Autowired - private DbService dbService; - //list all dbs @GetMapping("") @ResponseBody @@ -92,11 +84,11 @@ public class DbController { return null; } - Db oldDb = dbService.getDb(dbConfig.getName()); +/* Db oldDb = dbService.getDb(dbConfig.getName()); if (oldDb != null) { sendError(response, 400, "Db already exists: " + dbConfig.getName()); return null; - } else { + } else {*/ Db newdb = new Db(); newdb.setName(dbConfig.getName()); newdb.setHost(dbConfig.getHost()); @@ -118,7 +110,7 @@ public class DbController { retBody.setReturnBody(retMsg); retBody.setStatusCode(200); return retBody; - } + //} } //Show a db @@ -191,7 +183,7 @@ public class DbController { return null; } - Db oldDb = dbService.getDb(dbConfig.getName()); + Db oldDb = dbRepository.findById(dbConfig.getId()).get(); if (oldDb == null) { sendError(response, 404, "Db not found: " + dbConfig.getName()); return null; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java index 93cec8bb..1162aedd 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java @@ -27,17 +27,18 @@ import java.util.Set; import javax.servlet.http.HttpServletResponse; import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.Kafka; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.controller.domain.PostReturnBody; import org.onap.datalake.feeder.dto.TopicConfig; -import org.onap.datalake.feeder.repository.DbRepository; +import org.onap.datalake.feeder.repository.KafkaRepository; import org.onap.datalake.feeder.repository.TopicRepository; -import org.onap.datalake.feeder.service.DbService; import org.onap.datalake.feeder.service.DmaapService; import org.onap.datalake.feeder.service.TopicService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.http.MediaType; import org.springframework.validation.BindingResult; import org.springframework.web.bind.annotation.DeleteMapping; @@ -71,19 +72,27 @@ public class TopicController { private final Logger log = LoggerFactory.getLogger(this.getClass()); + //@Autowired + //private DmaapService dmaapService; + @Autowired - private DmaapService dmaapService; + private ApplicationContext context; + @Autowired + private KafkaRepository kafkaRepository; + @Autowired private TopicRepository topicRepository; @Autowired private TopicService topicService; - @GetMapping("/dmaap") + @GetMapping("/dmaap/{kafkaId}") @ResponseBody @ApiOperation(value = "List all topic names in DMaaP.") - public List listDmaapTopics() { + public List listDmaapTopics(@PathVariable("kafkaId") String kafkaId ) { + Kafka kafka = kafkaRepository.findById(kafkaId).get(); + DmaapService dmaapService = context.getBean(DmaapService.class, kafka); return dmaapService.getTopics(); } @@ -95,7 +104,7 @@ public class TopicController { List retString = new ArrayList<>(); for(Topic item : ret) { - if(!topicService.istDefaultTopic(item)) + if(!topicService.isDefaultTopic(item)) retString.add(item.getName()); } return retString; @@ -110,24 +119,25 @@ public class TopicController { sendError(response, 400, "Error parsing Topic: "+result.toString()); return null; } - Topic oldTopic = topicService.getTopic(topicConfig.getName()); + /*Topic oldTopic = topicService.getTopic(topicConfig.getName()); if (oldTopic != null) { sendError(response, 400, "Topic already exists "+topicConfig.getName()); return null; - } else { + } else {*/ Topic wTopic = topicService.fillTopicConfiguration(topicConfig); if(wTopic.getTtl() == 0) wTopic.setTtl(3650); topicRepository.save(wTopic); return mkPostReturnBody(200, wTopic); - } + //} + //FIXME need to connect to Kafka } - @GetMapping("/{topicName}") + @GetMapping("/{topicId}") @ResponseBody @ApiOperation(value="Get a topic's settings.") - public TopicConfig getTopic(@PathVariable("topicName") String topicName, HttpServletResponse response) throws IOException { - Topic topic = topicService.getTopic(topicName); + public TopicConfig getTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException { + Topic topic = topicService.getTopic(topicId); if(topic == null) { sendError(response, 404, "Topic not found"); return null; @@ -137,23 +147,23 @@ public class TopicController { //This is not a partial update: old topic is wiped out, and new topic is created based on the input json. //One exception is that old DBs are kept - @PutMapping("/{topicName}") + @PutMapping("/{topicId}") @ResponseBody @ApiOperation(value="Update a topic.") - public PostReturnBody updateTopic(@PathVariable("topicName") String topicName, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException { + public PostReturnBody updateTopic(@PathVariable("topicId") int topicId, @RequestBody TopicConfig topicConfig, BindingResult result, HttpServletResponse response) throws IOException { if (result.hasErrors()) { sendError(response, 400, "Error parsing Topic: "+result.toString()); return null; } - if(!topicName.equals(topicConfig.getName())) + if(topicId!=topicConfig.getId()) { - sendError(response, 400, "Topic name mismatch" + topicName + topicConfig.getName()); + sendError(response, 400, "Topic name mismatch" + topicId + topicConfig); return null; } - Topic oldTopic = topicService.getTopic(topicConfig.getName()); + Topic oldTopic = topicService.getTopic(topicId); if (oldTopic == null) { sendError(response, 404, "Topic not found "+topicConfig.getName()); return null; @@ -164,14 +174,14 @@ public class TopicController { } } - @DeleteMapping("/{topicName}") + @DeleteMapping("/{topicId}") @ResponseBody - @ApiOperation(value="Update a topic.") - public void deleteTopic(@PathVariable("topicName") String topicName, HttpServletResponse response) throws IOException + @ApiOperation(value="Delete a topic.") + public void deleteTopic(@PathVariable("topicId") int topicId, HttpServletResponse response) throws IOException { - Topic oldTopic = topicService.getTopic(topicName); + Topic oldTopic = topicService.getTopic(topicId); if (oldTopic == null) { - sendError(response, 404, "Topic not found "+topicName); + sendError(response, 404, "Topic not found "+topicId); } else { Set dbRelation = oldTopic.getDbs(); dbRelation.clear(); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java index d84b34f8..7059cd09 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java @@ -32,6 +32,9 @@ import javax.persistence.JoinTable; import javax.persistence.ManyToMany; import javax.persistence.ManyToOne; import javax.persistence.Table; + +import org.onap.datalake.feeder.enumeration.DbTypeEnum; + import com.fasterxml.jackson.annotation.JsonBackReference; import lombok.Getter; import lombok.Setter; @@ -51,12 +54,12 @@ public class Db { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) @Column(name = "`id`") - private Integer id; + private int id; @Column(name="`name`") private String name; - @Column(name="`enabled`") + @Column(name="`enabled`", nullable = false) private boolean enabled; @Column(name="`host`") @@ -98,13 +101,30 @@ public class Db { ) private Set topics; - public Db() { + public boolean isHdfs() { + return isDb(DbTypeEnum.HDFS); + } + + public boolean isElasticsearch() { + return isDb(DbTypeEnum.ES); + } + + public boolean isCouchbase() { + return isDb(DbTypeEnum.CB); + } + + public boolean isDruid() { + return isDb(DbTypeEnum.DRUID); } - public Db(String name) { - this.name = name; + public boolean isMongoDB() { + return isDb(DbTypeEnum.MONGO); } + private boolean isDb(DbTypeEnum dbTypeEnum) { + return dbTypeEnum.equals(DbTypeEnum.valueOf(dbType.getId())); + } + @Override public String toString() { return String.format("Db %s (name=%, enabled=%s)", id, name, enabled); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java index 0a88b155..9c83a9cd 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/DbType.java @@ -48,14 +48,14 @@ public class DbType { @Column(name="`id`") private String id; - @Column(name="`name`") + @Column(name="`name`", nullable = false) private String name; @Column(name="`default_port`") private Integer defaultPort; - @Column(name="`tool`") - private Boolean tool; + @Column(name="`tool`", nullable = false) + private boolean tool; @OneToMany(cascade = CascadeType.ALL, fetch = FetchType.LAZY, mappedBy = "dbType") protected Set dbs = new HashSet<>(); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java new file mode 100644 index 00000000..df7aad04 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/EffectiveTopic.java @@ -0,0 +1,64 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.domain; + +/** + * A warper of parent Topic + * + * @author Guobiao Mo + * + */ + +public class EffectiveTopic { + private Topic topic; //base Topic + + String name; + + public EffectiveTopic(Topic baseTopic) { + topic = baseTopic; + } + + public EffectiveTopic(Topic baseTopic, String name ) { + topic = baseTopic; + this.name = name; + } + + public String getName() { + return name==null?topic.getName():name; + } + + public void setName(String name) { + this.name = name; + } + + public Topic getTopic() { + return topic; + } + + public void setTopic(Topic topic) { + this.topic = topic; + } + + @Override + public String toString() { + return String.format("EffectiveTopic %s (base Topic=%s)", getName(), topic.toString()); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java index e3347a4a..d2189cbc 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java @@ -49,23 +49,23 @@ public class Kafka { @Column(name="`id`") private String id; - @Column(name="`name`") + @Column(name="`name`", nullable = false) private String name; - @Column(name="`enabled`") + @Column(name="`enabled`", nullable = false) private boolean enabled; - @Column(name="broker_list") + @Column(name="broker_list", nullable = false) private String brokerList;//message-router-kafka:9092,message-router-kafka2:9092 - @Column(name="`zk`") + @Column(name="`zk`", nullable = false) private String zooKeeper;//message-router-zookeeper:2181 @Column(name="`group`", columnDefinition = "varchar(255) DEFAULT 'datalake'") private String group; @Column(name="`secure`", columnDefinition = " bit(1) DEFAULT 0") - private Boolean secure; + private boolean secure; @Column(name="`login`") private String login; @@ -81,8 +81,7 @@ public class Kafka { @Column(name="`included_topic`") private String includedTopic; - //@Column(name="`excluded_topic`", columnDefinition = "varchar(1023) default '__consumer_offsets,__transaction_state'") - @Column(name="`excluded_topic`") + @Column(name="`excluded_topic`", columnDefinition = "varchar(1023) default '__consumer_offsets,__transaction_state'") private String excludedTopic; @Column(name="`consumer_count`", columnDefinition = "integer default 3") @@ -93,8 +92,8 @@ public class Kafka { private Integer timeout; //don't show this field in admin UI - @Column(name="`check_topic_interval_sec`", columnDefinition = "integer default 10") - private Integer checkTopicInterval; + //@Column(name="`check_topic_interval_sec`", columnDefinition = "integer default 10") +// private Integer checkTopicInterval; @JsonBackReference @ManyToMany(fetch = FetchType.EAGER) diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index cb07e140..a27b6756 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -20,6 +20,7 @@ package org.onap.datalake.feeder.domain; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -33,7 +34,16 @@ import javax.persistence.ManyToMany; import javax.persistence.ManyToOne; import javax.persistence.Table; +import org.apache.commons.lang3.StringUtils; +import org.json.JSONObject; import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.enumeration.DataFormat; +import org.onap.datalake.feeder.enumeration.DbTypeEnum; +import org.onap.datalake.feeder.service.db.CouchbaseService; +import org.onap.datalake.feeder.service.db.DbStoreService; +import org.onap.datalake.feeder.service.db.ElasticsearchService; +import org.onap.datalake.feeder.service.db.HdfsService; +import org.onap.datalake.feeder.service.db.MongodbService; import com.fasterxml.jackson.annotation.JsonBackReference; @@ -71,30 +81,30 @@ public class Topic { //@JsonManagedReference @ManyToMany(fetch = FetchType.EAGER) @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "db_id") }) - protected Set dbs; + protected Set dbs=new HashSet<>(); @ManyToMany(fetch = FetchType.EAGER) @JoinTable(name = "map_kafka_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "kafka_id") }) - protected Set kafkas; + protected Set kafkas=new HashSet<>(); /** * indicate if we should monitor this topic */ - @Column(name = "`enabled`") - private Boolean enabled; + @Column(name = "`enabled`", nullable = false) + private boolean enabled; /** * save raw message text */ - @Column(name = "`save_raw`") - private Boolean saveRaw; + @Column(name = "`save_raw`", nullable = false, columnDefinition = " bit(1) DEFAULT 0") + private boolean saveRaw; /** * need to explicitly tell feeder the data format of the message. support JSON, * XML, YAML, TEXT */ @Column(name = "`data_format`") - private String dataFormat; + protected String dataFormat; /** * TTL in day @@ -103,41 +113,33 @@ public class Topic { private Integer ttl; //if this flag is true, need to correlate alarm cleared message to previous alarm - @Column(name = "`correlate_cleared_message`") - private Boolean correlateClearedMessage; + @Column(name = "`correlate_cleared_message`", nullable = false, columnDefinition = " bit(1) DEFAULT 0") + private boolean correlateClearedMessage; //paths to the values in the JSON that are used to composite DB id, comma separated, example: "/event-header/id,/event-header/entity-type,/entity/product-name" @Column(name = "`message_id_path`") - private String messageIdPath; + protected String messageIdPath; //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray" - @Column(name = "`aggregate_array_path`") - private String aggregateArrayPath; + @Column(name = "`aggregate_array_path`") + protected String aggregateArrayPath; //paths to the element in array that need flatten, this element is used as label, comma separated, //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..." - @Column(name = "`flatten_array_path`") - private String flattenArrayPath; + @Column(name = "`flatten_array_path`") + protected String flattenArrayPath; public Topic() { } - +/* public Topic(String name) {//TODO //this.name = name; } - +*/ public String getName() { return topicName.getId(); } - public boolean isEnabled() { - return is(enabled); - } - - public boolean isCorrelateClearedMessage() { - return is(correlateClearedMessage); - } - public int getTtl() { if (ttl != null) { return ttl; @@ -145,27 +147,86 @@ public class Topic { return 3650;//default to 10 years for safe } } +/* + public boolean supportHdfs() { + return supportDb(DbTypeEnum.HDFS); + } + + public boolean supportElasticsearch() { + return supportDb(DbTypeEnum.ES); + } + + public boolean supportCouchbase() { + return supportDb(DbTypeEnum.CB); + } - private boolean is(Boolean b) { - return is(b, false); + public boolean supportDruid() { + return supportDb(DbTypeEnum.DRUID); } - private boolean is(Boolean b, boolean defaultValue) { - if (b != null) { - return b; + public boolean supportMongoDB() { + return supportDb(DbTypeEnum.MONGO); + } + + private boolean supportDb(DbTypeEnum dbTypeEnum) { + for(Db db : dbs) { + + } + } +*/ + public DataFormat getDataFormat2() { + if (dataFormat != null) { + return DataFormat.fromString(dataFormat); } else { - return defaultValue; + return null; + } + } + + public String[] getAggregateArrayPath2() { + String[] ret = null; + + if (StringUtils.isNotBlank(aggregateArrayPath)) { + ret = aggregateArrayPath.split(","); + } + + return ret; + } + + public String[] getFlattenArrayPath2() { + String[] ret = null; + + if (StringUtils.isNotBlank(flattenArrayPath)) { + ret = flattenArrayPath.split(","); } + + return ret; } - public boolean isSaveRaw() { - return is(saveRaw); + //extract DB id from JSON attributes, support multiple attributes + public String getMessageId(JSONObject json) { + String id = null; + + if (StringUtils.isNotBlank(messageIdPath)) { + String[] paths = messageIdPath.split(","); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < paths.length; i++) { + if (i > 0) { + sb.append('^'); + } + sb.append(json.query(paths[i]).toString()); + } + id = sb.toString(); + } + + return id; } public TopicConfig getTopicConfig() { TopicConfig tConfig = new TopicConfig(); - //tConfig.setName(getName()); + tConfig.setId(getId()); + tConfig.setName(getName()); tConfig.setLogin(getLogin()); tConfig.setEnabled(isEnabled()); tConfig.setDataFormat(dataFormat); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java index 0b6c54c3..eff87114 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/DbConfig.java @@ -33,6 +33,7 @@ import lombok.Setter; @Getter @Setter public class DbConfig { + private int id; private String name; private String host; private boolean enabled; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java index 1fffa7ec..ace7bfa9 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java @@ -41,6 +41,7 @@ import org.onap.datalake.feeder.enumeration.DataFormat; public class TopicConfig { + private int id; private String name; private String login; private String password; @@ -54,79 +55,7 @@ public class TopicConfig { private String messageIdPath; private String aggregateArrayPath; private String flattenArrayPath; - - public DataFormat getDataFormat2() { - if (dataFormat != null) { - return DataFormat.fromString(dataFormat); - } else { - return null; - } - } - - public boolean supportHdfs() { - return supportDb("HDFS"); - } - - public boolean supportElasticsearch() { - return supportDb("Elasticsearch");//TODO string hard codes - } - - public boolean supportCouchbase() { - return supportDb("Couchbase"); - } - - public boolean supportDruid() { - return supportDb("Druid"); - } - - public boolean supportMongoDB() { - return supportDb("MongoDB"); - } - - private boolean supportDb(String dbName) { - return (enabledSinkdbs != null && enabledSinkdbs.contains(dbName)); - } - - //extract DB id from JSON attributes, support multiple attributes - public String getMessageId(JSONObject json) { - String id = null; - - if (StringUtils.isNotBlank(messageIdPath)) { - String[] paths = messageIdPath.split(","); - - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < paths.length; i++) { - if (i > 0) { - sb.append('^'); - } - sb.append(json.query(paths[i]).toString()); - } - id = sb.toString(); - } - - return id; - } - - public String[] getAggregateArrayPath2() { - String[] ret = null; - - if (StringUtils.isNotBlank(aggregateArrayPath)) { - ret = aggregateArrayPath.split(","); - } - - return ret; - } - - public String[] getFlattenArrayPath2() { - String[] ret = null; - - if (StringUtils.isNotBlank(flattenArrayPath)) { - ret = flattenArrayPath.split(","); - } - - return ret; - } - + @Override public String toString() { return String.format("TopicConfig %s(enabled=%s, enabledSinkdbs=%s)", name, enabled, enabledSinkdbs); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java index 9b1eb23b..05d76d55 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java @@ -26,7 +26,7 @@ package org.onap.datalake.feeder.enumeration; * */ public enum DbTypeEnum { - CB("Couchbase"), DRUID("Druid"), ES("Elasticsearch"), HDFS("HDFS"), MONGO("MongoDB"), KIBANA("Kibana"); + CB("Couchbase"), DRUID("Druid"), ES("Elasticsearch"), HDFS("HDFS"), MONGO("MongoDB"), KIBANA("Kibana"), SUPERSET("Superset"); private final String name; @@ -34,12 +34,4 @@ public enum DbTypeEnum { this.name = name; } - public static DbTypeEnum fromString(String s) { - for (DbTypeEnum df : DbTypeEnum.values()) { - if (df.name.equalsIgnoreCase(s)) { - return df; - } - } - throw new IllegalArgumentException("Invalid value for db: " + s); - } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java new file mode 100644 index 00000000..157fbf94 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DesignTypeEnum.java @@ -0,0 +1,38 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2018 TechMahindra +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.enumeration; + +/** + * Design type + * + * @author Guobiao Mo + * + */ +public enum DesignTypeEnum { + KIBANA_DB("Kibana Dashboard"), KIBANA_SEARCH("Kibana Search"), KIBANA_VISUAL("Kibana Visualization"), + ES_MAPPING("Elasticsearch Field Mapping Template"), DRUID_KAFKA_SPEC("Druid Kafka Indexing Service Supervisor Spec"); + + private final String name; + + DesignTypeEnum(String name) { + this.name = name; + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java new file mode 100644 index 00000000..9f8ea8a9 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicNameRepository.java @@ -0,0 +1,35 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.repository; + +import org.onap.datalake.feeder.domain.TopicName; +import org.springframework.data.repository.CrudRepository; + +/** + * + * TopicName Repository + * + * @author Guobiao Mo + * + */ + +public interface TopicNameRepository extends CrudRepository { + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java index 182bf6f1..b4dd6374 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java @@ -19,6 +19,9 @@ */ package org.onap.datalake.feeder.repository; +import java.util.List; + +import org.onap.datalake.feeder.domain.Portal; import org.onap.datalake.feeder.domain.Topic; import org.springframework.data.repository.CrudRepository; @@ -32,5 +35,5 @@ import org.springframework.data.repository.CrudRepository; */ public interface TopicRepository extends CrudRepository { - + //List findByTopicName(String topicStr); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java deleted file mode 100644 index fc31b2eb..00000000 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java +++ /dev/null @@ -1,170 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DATALAKE -* ================================================================================ -* Copyright 2019 China Mobile -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ - -package org.onap.datalake.feeder.service; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - -import org.json.JSONObject; -import org.onap.datalake.feeder.config.ApplicationConfiguration; -import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.dto.TopicConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import com.couchbase.client.java.Bucket; -import com.couchbase.client.java.Cluster; -import com.couchbase.client.java.CouchbaseCluster; -import com.couchbase.client.java.document.JsonDocument; -import com.couchbase.client.java.document.json.JsonObject; -import com.couchbase.client.java.env.CouchbaseEnvironment; -import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; -import com.couchbase.client.java.error.DocumentAlreadyExistsException; - -import rx.Observable; -import rx.functions.Func1; - -/** - * Service to use Couchbase - * - * @author Guobiao Mo - * - */ -@Service -public class CouchbaseService { - - private final Logger log = LoggerFactory.getLogger(this.getClass()); - - @Autowired - ApplicationConfiguration config; - - @Autowired - private DbService dbService; - - Bucket bucket; - private boolean isReady = false; - - @PostConstruct - private void init() { - // Initialize Couchbase Connection - try { - Db couchbase = dbService.getCouchbase(); - - //this tunes the SDK (to customize connection timeout) - CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder().connectTimeout(60000) // 60s, default is 5s - .build(); - Cluster cluster = CouchbaseCluster.create(env, couchbase.getHost()); - cluster.authenticate(couchbase.getLogin(), couchbase.getPass()); - bucket = cluster.openBucket(couchbase.getDatabase()); - // Create a N1QL Primary Index (but ignore if it exists) - bucket.bucketManager().createN1qlPrimaryIndex(true, false); - - log.info("Connected to Couchbase {} as {}", couchbase.getHost(), couchbase.getLogin()); - isReady = true; - } catch (Exception ex) { - log.error("error connection to Couchbase.", ex); - isReady = false; - } - } - - @PreDestroy - public void cleanUp() { - config.getShutdownLock().readLock().lock(); - - try { - log.info("bucket.close() at cleanUp."); - bucket.close(); - } finally { - config.getShutdownLock().readLock().unlock(); - } - } - - public void saveJsons(TopicConfig topic, List jsons) { - List documents = new ArrayList<>(jsons.size()); - for (JSONObject json : jsons) { - //convert to Couchbase JsonObject from org.json JSONObject - JsonObject jsonObject = JsonObject.fromJson(json.toString()); - - long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson() - - //setup TTL - int expiry = (int) (timestamp / 1000L) + topic.getTtl() * 3600 * 24; //in second - - String id = getId(topic, json); - JsonDocument doc = JsonDocument.create(id, expiry, jsonObject); - documents.add(doc); - } - try { - saveDocuments(documents); - } catch (DocumentAlreadyExistsException e) { - log.error("Some or all the following ids are duplicate."); - for(JsonDocument document : documents) { - log.error("saveJsons() DocumentAlreadyExistsException {}", document.id()); - } - } catch (rx.exceptions.CompositeException e) { - List causes = e.getExceptions(); - for(Throwable cause : causes) { - log.error("saveJsons() CompositeException cause {}", cause.getMessage()); - } - } catch (Exception e) { - log.error("error saving to Couchbase.", e); - } - log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); - } - - public String getId(TopicConfig topic, JSONObject json) { - //if this topic requires extract id from JSON - String id = topic.getMessageId(json); - if (id != null) { - return id; - } - - String topicStr = topic.getName(); - id = topicStr+":"+UUID.randomUUID(); - - //https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2 - //atomically get the next sequence number: - // increment by 1, initialize at 0 if counter doc not found - //TODO how slow is this compared with above UUID approach? - //sometimes this gives java.util.concurrent.TimeoutException - //JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 - //id = topicStr + ":" + nextIdNumber.content(); - - return id; - } - - //https://docs.couchbase.com/java-sdk/2.7/document-operations.html - private void saveDocuments(List documents) { - Observable.from(documents).flatMap(new Func1>() { - @Override - public Observable call(final JsonDocument docToInsert) { - return bucket.async().insert(docToInsert); - } - }).last().toBlocking().single(); - } - -} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java index 6d6fb750..2e934e2e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java @@ -20,9 +20,6 @@ package org.onap.datalake.feeder.service; -import java.util.Optional; - -import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.repository.DbRepository; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -38,29 +35,4 @@ public class DbService { @Autowired private DbRepository dbRepository; - - public Db getDb(String name) { - return dbRepository.findByName(name); - } - - public Db getCouchbase() { - return getDb("Couchbase"); - } - - public Db getElasticsearch() { - return getDb("Elasticsearch"); - } - - public Db getMongoDB() { - return getDb("MongoDB"); - } - - public Db getDruid() { - return getDb("Druid"); - } - - public Db getHdfs() { - return getDb("HDFS"); - } - } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java index 5c544d6c..1bfd437f 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java @@ -24,7 +24,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import javax.annotation.PostConstruct; @@ -35,6 +37,8 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Kafka; import org.onap.datalake.feeder.dto.TopicConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +64,12 @@ public class DmaapService { private ZooKeeper zk; + private Kafka kafka; + + public DmaapService(Kafka kafka) { + this.kafka = kafka; + } + @PreDestroy public void cleanUp() throws InterruptedException { config.getShutdownLock().readLock().lock(); @@ -76,7 +86,7 @@ public class DmaapService { @PostConstruct private void init() throws IOException, InterruptedException { - zk = connect(config.getDmaapZookeeperHostPort()); + zk = connect(kafka.getZooKeeper()); } //get all topic names from Zookeeper @@ -84,11 +94,11 @@ public class DmaapService { public List getTopics() { try { if (zk == null) { - zk = connect(config.getDmaapZookeeperHostPort()); + zk = connect(kafka.getZooKeeper()); } - log.info("connecting to ZooKeeper {} for a list of topics.", config.getDmaapZookeeperHostPort()); + log.info("connecting to ZooKeeper {} for a list of topics.", kafka.getZooKeeper()); List topics = zk.getChildren("/brokers/topics", false); - String[] excludes = config.getDmaapKafkaExclude(); + String[] excludes = kafka.getExcludedTopic().split(","); topics.removeAll(Arrays.asList(excludes)); log.info("list of topics: {}", topics); return topics; @@ -100,7 +110,7 @@ public class DmaapService { } private ZooKeeper connect(String host) throws IOException, InterruptedException { - log.info("connecting to ZooKeeper {} ...", config.getDmaapZookeeperHostPort()); + log.info("connecting to ZooKeeper {} ...", kafka.getZooKeeper()); CountDownLatch connectedSignal = new CountDownLatch(1); ZooKeeper ret = new ZooKeeper(host, 10000, new Watcher() { public void process(WatchedEvent we) { @@ -126,18 +136,18 @@ public class DmaapService { return ret; } */ - public List getActiveTopicConfigs() throws IOException { + public Map> getActiveEffectiveTopic() throws IOException { log.debug("entering getActiveTopicConfigs()..."); - List allTopics = getTopics(); + List allTopics = getTopics(); //topics in Kafka cluster TODO update table topic_name with new topics - List ret = new ArrayList<>(allTopics.size()); + Map> ret = new HashMap<>(); for (String topicStr : allTopics) { log.debug("get topic setting from DB: {}.", topicStr); - TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true); - if (topicConfig.isEnabled()) { - ret.add(topicConfig); - } + List effectiveTopics= topicService.getEnabledEffectiveTopic(kafka, topicStr, true); + + ret.put(topicStr , effectiveTopics); + } return ret; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java deleted file mode 100644 index b40f544c..00000000 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ /dev/null @@ -1,247 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DATALAKE -* ================================================================================ -* Copyright 2019 China Mobile -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ - -package org.onap.datalake.feeder.service; - -import java.io.IOException; -import java.util.List; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - -import org.apache.commons.lang3.StringUtils; -import org.apache.http.HttpHost; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.indices.CreateIndexRequest; -import org.elasticsearch.client.indices.CreateIndexResponse; -import org.elasticsearch.client.indices.GetIndexRequest; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.rest.RestStatus; -import org.json.JSONObject; -import org.onap.datalake.feeder.config.ApplicationConfiguration; -import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.dto.TopicConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -/** - * Elasticsearch Service for table creation, data submission, as well as data pre-processing. - * - * @author Guobiao Mo - * - */ -@Service -public class ElasticsearchService { - - private final Logger log = LoggerFactory.getLogger(this.getClass()); - - @Autowired - private ApplicationConfiguration config; - - @Autowired - private DbService dbService; - - private RestHighLevelClient client; - ActionListener listener; - - //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication - //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html - @PostConstruct - private void init() { - Db elasticsearch = dbService.getElasticsearch(); - String elasticsearchHost = elasticsearch.getHost(); - - // Initialize the Connection - client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http"))); - - log.info("Connected to Elasticsearch Host {}", elasticsearchHost); - - listener = new ActionListener() { - @Override - public void onResponse(BulkResponse bulkResponse) { - if(bulkResponse.hasFailures()) { - log.debug(bulkResponse.buildFailureMessage()); - } - } - - @Override - public void onFailure(Exception e) { - log.error(e.getMessage()); - } - }; - } - - @PreDestroy - public void cleanUp() throws IOException { - config.getShutdownLock().readLock().lock(); - - try { - log.info("cleanUp() closing Elasticsearch client."); - client.close(); - } catch (IOException e) { - log.error("client.close() at cleanUp.", e); - } finally { - config.getShutdownLock().readLock().unlock(); - } - } - - public void ensureTableExist(String topic) throws IOException { - String topicLower = topic.toLowerCase(); - - GetIndexRequest request = new GetIndexRequest(topicLower); - - boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); - if (!exists) { - //TODO submit mapping template - CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); - CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); - log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged()); - } - } - - //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME - public void saveJsons(TopicConfig topic, List jsons) { - - BulkRequest request = new BulkRequest(); - - for (JSONObject json : jsons) { - if (topic.isCorrelateClearedMessage()) { - boolean found = correlateClearedMessage(topic, json); - if (found) { - continue; - } - } - - String id = topic.getMessageId(json); //id can be null - - request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON)); - } - - log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size()); - - if (config.isAsync()) { - client.bulkAsync(request, RequestOptions.DEFAULT, listener); - } else { - try { - BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); - if(bulkResponse.hasFailures()) { - log.debug(bulkResponse.buildFailureMessage()); - } - } catch (IOException e) { - log.error(topic.getName(), e); - } - } - - } - - /** - * - * @param topic - * @param json - * @return boolean - * - * Because of query by id, The search API cannot be used for query. The - * search API can only query all data or based on the fields in the - * source. So use the get API, three parameters: index, type, document - * id - */ - private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) { - boolean found = false; - String eName = null; - - try { - eName = json.query("/event/commonEventHeader/eventName").toString(); - - if (StringUtils.isNotBlank(eName) && eName.endsWith("Cleared")) { - - String name = eName.substring(0, eName.length() - 7); - String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString(); - String specificProblem = json.query("/event/faultFields/specificProblem").toString(); - - String id = String.join("^", name, reportingEntityName, specificProblem);//example: id = "aaaa^cccc^bbbbb" - String index = topic.getName().toLowerCase(); - - //get - GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id); - - GetResponse getResponse = null; - try { - getResponse = client.get(getRequest, RequestOptions.DEFAULT); - if (getResponse != null) { - - if (getResponse.isExists()) { - String sourceAsString = getResponse.getSourceAsString(); - JSONObject jsonObject = new JSONObject(sourceAsString); - jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed"); - String jsonString = jsonObject.toString(); - - //update - IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id); - request.source(jsonString, XContentType.JSON); - IndexResponse indexResponse = null; - try { - indexResponse = client.index(request, RequestOptions.DEFAULT); - found = true; - } catch (IOException e) { - log.error("save failure"); - } - } else { - log.error("The getResponse was not exists"); - } - - } else { - log.error("The document for this id was not found"); - } - - } catch (ElasticsearchException e) { - if (e.status() == RestStatus.NOT_FOUND) { - log.error("The document for this id was not found"); - } - if (e.status() == RestStatus.CONFLICT) { - log.error("Version conflict"); - } - log.error("Get document exception", e); - } catch (IOException e) { - log.error(topic.getName(), e); - } - - } - - } catch (Exception e) { - log.error("error", e); - } - - return found; - } - -} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java deleted file mode 100644 index d92d05ac..00000000 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java +++ /dev/null @@ -1,218 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DATALAKE -* ================================================================================ -* Copyright 2019 China Mobile -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ - -package org.onap.datalake.feeder.service; - -import java.io.IOException; -import java.net.InetAddress; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.ShutdownHookManager; -import org.onap.datalake.feeder.config.ApplicationConfiguration; -import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.dto.TopicConfig; -import org.onap.datalake.feeder.util.Util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import lombok.Getter; -import lombok.Setter; - -/** - * Service to write data to HDFS - * - * @author Guobiao Mo - * - */ -@Service -public class HdfsService { - - private final Logger log = LoggerFactory.getLogger(this.getClass()); - - @Autowired - ApplicationConfiguration config; - - @Autowired - private DbService dbService; - - FileSystem fileSystem; - private boolean isReady = false; - - private ThreadLocal> bufferLocal = ThreadLocal.withInitial(HashMap::new); - private ThreadLocal dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd")); - private ThreadLocal timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS")); - - @Setter - @Getter - private class Buffer { - long lastFlush; - List data; - - public Buffer() { - lastFlush = Long.MIN_VALUE; - data = new ArrayList<>(); - } - - public void flush(String topic) { - try { - if (!data.isEmpty()) { - saveMessages(topic, data); - data.clear(); - lastFlush = System.currentTimeMillis(); - } - } catch (IOException e) { - log.error("{} error saving to HDFS. {}", topic, e.getMessage()); - } - } - - public void flushStall(String topic) { - if (!data.isEmpty() && Util.isStall(lastFlush, config.getHdfsFlushInterval())) { - log.debug("going to flushStall topic={}, buffer size={}", topic, data.size()); - flush(topic); - } - } - - public void addData(List> messages) { - if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer - lastFlush = System.currentTimeMillis(); - } - - messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used - } - - private void saveMessages(String topic, List bufferList) throws IOException { - - long thread = Thread.currentThread().getId(); - Date date = new Date(); - String day = dayFormat.get().format(date); - String time = timeFormat.get().format(date); - - InetAddress inetAddress = InetAddress.getLocalHost(); - String hostName = inetAddress.getHostName(); - - String filePath = String.format("/datalake/%s/%s/%s-%s-%s", topic, day, time, hostName, thread); - Path path = new Path(filePath); - log.debug("writing {} to HDFS {}", bufferList.size(), filePath); - - // Create a new file and write data to it. - FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize()); - - bufferList.stream().forEach(message -> { - try { - out.writeUTF(message); - out.write('\n'); - } catch (IOException e) { - log.error("error writing to HDFS. {}", e.getMessage()); - } - }); - - out.close(); - log.debug("Done writing {} to HDFS {}", bufferList.size(), filePath); - } - } - - @PostConstruct - private void init() { - // Initialize HDFS Connection - try { - Db hdfs = dbService.getHdfs(); - - //Get configuration of Hadoop system - Configuration hdfsConfig = new Configuration(); - - int port = hdfs.getPort() == null ? 8020 : hdfs.getPort(); - - String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port); - hdfsConfig.set("fs.defaultFS", hdfsuri); - System.setProperty("HADOOP_USER_NAME", hdfs.getLogin()); - - log.info("Connecting to -- {} as {}", hdfsuri, hdfs.getLogin()); - - fileSystem = FileSystem.get(hdfsConfig); - - //disable Hadoop Shutdown Hook, we need the HDFS connection to flush data - ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get(); - hadoopShutdownHookManager.clearShutdownHooks(); - - isReady = true; - } catch (Exception ex) { - log.error("error connection to HDFS.", ex); - isReady = false; - } - } - - @PreDestroy - public void cleanUp() { - config.getShutdownLock().readLock().lock(); - - try { - log.info("fileSystem.close() at cleanUp."); - flush(); - fileSystem.close(); - } catch (IOException e) { - log.error("fileSystem.close() at cleanUp.", e); - } finally { - config.getShutdownLock().readLock().unlock(); - } - } - - public void flush() { - log.info("Force flush ALL data, regardless of stall"); - bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic)); - } - - //if no new data comes in for a topic for a while, need to flush its buffer - public void flushStall() { - log.debug("Flush stall data"); - bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic)); - } - - public void saveMessages(TopicConfig topic, List> messages) { - String topicStr = topic.getName(); - - Map bufferMap = bufferLocal.get(); - final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer()); - - buffer.addData(messages); - - if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) { - buffer.flush(topicStr); - } else { - log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize()); - } - } - -} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java deleted file mode 100644 index f3462e49..00000000 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java +++ /dev/null @@ -1,174 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DATALAKE -* ================================================================================ -* Copyright 2018 China Mobile -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ - -package org.onap.datalake.feeder.service; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - -import org.apache.commons.lang3.StringUtils; -import org.bson.Document; - -import org.json.JSONObject; -import org.onap.datalake.feeder.config.ApplicationConfiguration; -import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.dto.TopicConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import com.mongodb.bulk.BulkWriteError; -import com.mongodb.MongoBulkWriteException; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientOptions; -import com.mongodb.MongoClientOptions.Builder; -import com.mongodb.MongoCredential; -import com.mongodb.ServerAddress; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoDatabase; -import com.mongodb.client.model.InsertManyOptions; - -/** - * Service for using MongoDB - * - * @author Guobiao Mo - * - */ -@Service -public class MongodbService { - - private final Logger log = LoggerFactory.getLogger(this.getClass()); - - @Autowired - private ApplicationConfiguration config; - private boolean dbReady = false; - - @Autowired - private DbService dbService; - - private MongoDatabase database; - private MongoClient mongoClient; - private Map> mongoCollectionMap = new HashMap<>(); - private InsertManyOptions insertManyOptions; - - @PostConstruct - private void init() { - Db mongodb = dbService.getMongoDB(); - - String host = mongodb.getHost(); - - Integer port = mongodb.getPort(); - if (port == null || port == 0) { - port = 27017; //MongoDB default - } - - String databaseName = mongodb.getDatabase(); - String userName = mongodb.getLogin(); - String password = mongodb.getPass(); - - MongoCredential credential = null; - if (StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password)) { - credential = MongoCredential.createCredential(userName, databaseName, password.toCharArray()); - } - - Builder builder = MongoClientOptions.builder(); - builder.serverSelectionTimeout(30000);//server selection timeout, in milliseconds - - //http://mongodb.github.io/mongo-java-driver/3.0/driver/reference/connecting/ssl/ - if (config.isEnableSSL()) { - builder.sslEnabled(Boolean.TRUE.equals(mongodb.getEncrypt()));// getEncrypt() can be null - } - MongoClientOptions options = builder.build(); - List addrs = new ArrayList(); - - addrs.add(new ServerAddress(host, port)); // FIXME should be a list of address - - try { - if (StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password)) { - credential = MongoCredential.createCredential(userName, databaseName, password.toCharArray()); - List credentialList = new ArrayList(); - credentialList.add(credential); - mongoClient = new MongoClient(addrs, credentialList, options); - } else { - mongoClient = new MongoClient(addrs, options); - } - } catch (Exception ex) { - dbReady = false; - log.error("Fail to initiate MongoDB" + mongodb.getHost()); - return; - } - database = mongoClient.getDatabase(mongodb.getDatabase()); - - insertManyOptions = new InsertManyOptions(); - insertManyOptions.ordered(false); - - dbReady = true; - } - - @PreDestroy - public void cleanUp() { - config.getShutdownLock().readLock().lock(); - - try { - log.info("mongoClient.close() at cleanUp."); - mongoClient.close(); - } finally { - config.getShutdownLock().readLock().unlock(); - } - } - - public void saveJsons(TopicConfig topic, List jsons) { - if (dbReady == false)//TOD throw exception - return; - List documents = new ArrayList<>(jsons.size()); - for (JSONObject json : jsons) { - //convert org.json JSONObject to MongoDB Document - Document doc = Document.parse(json.toString()); - - String id = topic.getMessageId(json); //id can be null - if (id != null) { - doc.put("_id", id); - } - documents.add(doc); - } - - String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ . - MongoCollection collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k)); - - try { - collection.insertMany(documents, insertManyOptions); - } catch (MongoBulkWriteException e) { - List bulkWriteErrors = e.getWriteErrors(); - for (BulkWriteError bulkWriteError : bulkWriteErrors) { - log.error("Failed record: {}", bulkWriteError); - } - } - - log.debug("saved text to topic = {}, batch count = {} ", topic, jsons.size()); - } - -} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PortalDesignService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PortalDesignService.java index df701e88..408e4971 100755 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PortalDesignService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PortalDesignService.java @@ -23,15 +23,27 @@ package org.onap.datalake.feeder.service; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.Set; import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.DbType; import org.onap.datalake.feeder.domain.DesignType; import org.onap.datalake.feeder.domain.Portal; import org.onap.datalake.feeder.domain.PortalDesign; import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.domain.TopicName; import org.onap.datalake.feeder.dto.PortalDesignConfig; +import org.onap.datalake.feeder.enumeration.DbTypeEnum; +import org.onap.datalake.feeder.enumeration.DesignTypeEnum; import org.onap.datalake.feeder.repository.DesignTypeRepository; import org.onap.datalake.feeder.repository.PortalDesignRepository; +import org.onap.datalake.feeder.repository.TopicNameRepository; +import org.onap.datalake.feeder.service.db.CouchbaseService; +import org.onap.datalake.feeder.service.db.DbStoreService; +import org.onap.datalake.feeder.service.db.ElasticsearchService; +import org.onap.datalake.feeder.service.db.HdfsService; +import org.onap.datalake.feeder.service.db.MongodbService; import org.onap.datalake.feeder.util.HttpClientUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,11 +63,11 @@ public class PortalDesignService { static String POST_FLAG; - @Autowired - private PortalDesignRepository portalDesignRepository; + @Autowired + private PortalDesignRepository portalDesignRepository; - @Autowired - private TopicService topicService; + @Autowired + private TopicNameRepository topicNameRepository; @Autowired private DesignTypeRepository designTypeRepository; @@ -63,17 +75,13 @@ public class PortalDesignService { @Autowired private ApplicationConfiguration applicationConfiguration; - @Autowired - private DbService dbService; - - public PortalDesign fillPortalDesignConfiguration(PortalDesignConfig portalDesignConfig) throws Exception - { + public PortalDesign fillPortalDesignConfiguration(PortalDesignConfig portalDesignConfig) throws Exception { PortalDesign portalDesign = new PortalDesign(); fillPortalDesign(portalDesignConfig, portalDesign); return portalDesign; } - public void fillPortalDesignConfiguration(PortalDesignConfig portalDesignConfig, PortalDesign portalDesign) throws Exception - { + + public void fillPortalDesignConfiguration(PortalDesignConfig portalDesignConfig, PortalDesign portalDesign) throws Exception { fillPortalDesign(portalDesignConfig, portalDesign); } @@ -86,32 +94,34 @@ public class PortalDesignService { portalDesign.setSubmitted(portalDesignConfig.getSubmitted()); if (portalDesignConfig.getTopic() != null) { - Topic topic = topicService.getTopic(portalDesignConfig.getTopic()); - if (topic == null) throw new IllegalArgumentException("topic is null"); - portalDesign.setTopicName(topic.getTopicName()); - }else { - throw new IllegalArgumentException("Can not find topic in DB, topic name: "+portalDesignConfig.getTopic()); + Optional topicName = topicNameRepository.findById(portalDesignConfig.getTopic()); + if (topicName.isPresent()) { + portalDesign.setTopicName(topicName.get()); + } else { + throw new IllegalArgumentException("topic is null " + portalDesignConfig.getTopic()); + } + } else { + throw new IllegalArgumentException("Can not find topic in DB, topic name: " + portalDesignConfig.getTopic()); } if (portalDesignConfig.getDesignType() != null) { DesignType designType = designTypeRepository.findById(portalDesignConfig.getDesignType()).get(); - if (designType == null) throw new IllegalArgumentException("designType is null"); + if (designType == null) + throw new IllegalArgumentException("designType is null"); portalDesign.setDesignType(designType); - }else { - throw new IllegalArgumentException("Can not find designType in Design_type, designType name "+portalDesignConfig.getDesignType()); + } else { + throw new IllegalArgumentException("Can not find designType in Design_type, designType name " + portalDesignConfig.getDesignType()); } } - public PortalDesign getPortalDesign(Integer id) { - + Optional ret = portalDesignRepository.findById(id); return ret.isPresent() ? ret.get() : null; } - - public List queryAllPortalDesign(){ + public List queryAllPortalDesign() { List portalDesignList = null; List portalDesignConfigList = new ArrayList<>(); @@ -125,30 +135,21 @@ public class PortalDesignService { return portalDesignConfigList; } - - public boolean deploy(PortalDesign portalDesign){ - boolean flag =true; - String designTypeName = portalDesign.getDesignType().getName(); - if (portalDesign.getDesignType() != null && "kibana_db".equals(designTypeName)) { - flag = deployKibanaImport(portalDesign); - } else if (portalDesign.getDesignType() != null && "kibana_visual".equals(designTypeName)) { - //TODO - flag =false; - } else if (portalDesign.getDesignType() != null && "kibana_search".equals(designTypeName)) { - //TODO - flag = false; - } else if (portalDesign.getDesignType() != null && "es_mapping".equals(designTypeName)) { - flag = postEsMappingTemplate(portalDesign, portalDesign.getTopicName().getId().toLowerCase()); - } else if (portalDesign.getDesignType() != null && "druid_kafka_spec".equals(designTypeName)) { - //TODO - flag =false; - } else { - flag =false; + public boolean deploy(PortalDesign portalDesign) { + DesignType designType = portalDesign.getDesignType(); + DesignTypeEnum designTypeEnum = DesignTypeEnum.valueOf(designType.getId()); + + switch (designTypeEnum) { + case KIBANA_DB: + return deployKibanaImport(portalDesign); + case ES_MAPPING: + return postEsMappingTemplate(portalDesign, portalDesign.getTopicName().getId().toLowerCase()); + default: + log.error("Not implemented {}", designTypeEnum); + return false; } - return flag; } - private boolean deployKibanaImport(PortalDesign portalDesign) throws RuntimeException { POST_FLAG = "KibanaDashboardImport"; String requestBody = portalDesign.getBody(); @@ -168,20 +169,16 @@ public class PortalDesignService { } - - private String kibanaImportUrl(String host, Integer port){ + private String kibanaImportUrl(String host, Integer port) { if (port == null) { port = applicationConfiguration.getKibanaPort(); } - return "http://"+host+":"+port+applicationConfiguration.getKibanaDashboardImportApi(); + return "http://" + host + ":" + port + applicationConfiguration.getKibanaDashboardImportApi(); } - /** - * successed resp: - * { - * "acknowledged": true - * } + * successed resp: { "acknowledged": true } + * * @param portalDesign * @param templateName * @return flag @@ -189,7 +186,13 @@ public class PortalDesignService { public boolean postEsMappingTemplate(PortalDesign portalDesign, String templateName) throws RuntimeException { POST_FLAG = "ElasticsearchMappingTemplate"; String requestBody = portalDesign.getBody(); - return HttpClientUtil.sendPostHttpClient("http://"+dbService.getElasticsearch().getHost()+":9200/_template/"+templateName, requestBody, POST_FLAG); + + //FIXME + Set dbs = portalDesign.getDbs(); + //submit to each ES in dbs + + //return HttpClientUtil.sendPostHttpClient("http://"+dbService.getElasticsearch().getHost()+":9200/_template/"+templateName, requestBody, POST_FLAG); + return false; } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java index dc04cf60..65de0bdc 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java @@ -50,7 +50,7 @@ public class PullService { private boolean isRunning = false; private ExecutorService executorService; - private Thread topicConfigPollingThread; +// private Thread topicConfigPollingThread; private Set pullers; @Autowired @@ -94,10 +94,11 @@ public class PullService { } } - topicConfigPollingThread = new Thread(topicConfigPollingService); + executorService.submit(topicConfigPollingService); + /*topicConfigPollingThread = new Thread(topicConfigPollingService); topicConfigPollingThread.setName("TopicConfigPolling"); topicConfigPollingThread.start(); - +*/ isRunning = true; Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); @@ -126,11 +127,12 @@ public class PullService { puller.shutdown(); } - logger.info("stop TopicConfigPollingService ..."); - topicConfigPollingService.shutdown(); +// logger.info("stop TopicConfigPollingService ..."); +// topicConfigPollingService.shutdown(); - topicConfigPollingThread.join(); + // topicConfigPollingThread.join(); + logger.info("stop executorService ..."); executorService.shutdown(); executorService.awaitTermination(120L, TimeUnit.SECONDS); } catch (InterruptedException e) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java index 5cc3b55d..1550e531 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java @@ -29,7 +29,6 @@ import java.util.Properties; import javax.annotation.PostConstruct; -import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -54,7 +53,6 @@ import org.springframework.stereotype.Service; */ @Service -//@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class Puller implements Runnable { @Autowired @@ -75,6 +73,9 @@ public class Puller implements Runnable { private Kafka kafka; + public Puller( ) { + + } public Puller(Kafka kafka) { this.kafka = kafka; } @@ -84,11 +85,11 @@ public class Puller implements Runnable { async = config.isAsync(); } - private Properties getConsumerConfig() {//00 + private Properties getConsumerConfig() { Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup()); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBrokerList()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, kafka.getGroup()); consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(Thread.currentThread().getId())); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); @@ -96,10 +97,10 @@ public class Puller implements Runnable { consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor"); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - if (StringUtils.isNotBlank(config.getDmaapKafkaLogin())) { - String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + config.getDmaapKafkaLogin() + " password=" + config.getDmaapKafkaPass() + " serviceName=kafka;"; + if (kafka.isSecure()) { + String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + kafka.getLogin() + " password=" + kafka.getPass() + " serviceName=kafka;"; consumerConfig.put("sasl.jaas.config", jaas); - consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.getDmaapKafkaSecurityProtocol()); + consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafka.getSecurityProtocol()); consumerConfig.put("sasl.mechanism", "PLAIN"); } return consumerConfig; @@ -120,8 +121,8 @@ public class Puller implements Runnable { try { while (active) { - if (topicConfigPollingService.isActiveTopicsChanged(true)) {//true means update local version as well - List topics = topicConfigPollingService.getActiveTopics(kafka);//00 + if (topicConfigPollingService.isActiveTopicsChanged(kafka)) { + Collection topics = topicConfigPollingService.getActiveTopics(kafka); log.info("Active Topic list is changed, subscribe to the latest topics: {}", topics); consumer.subscribe(topics, rebalanceListener); } @@ -141,7 +142,7 @@ public class Puller implements Runnable { KafkaConsumer consumer = consumerLocal.get(); log.debug("pulling..."); - ConsumerRecords records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout())); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(kafka.getTimeout())); log.debug("done pulling."); if (records != null && records.count() > 0) { @@ -153,7 +154,7 @@ public class Puller implements Runnable { messages.add(Pair.of(record.timestamp(), record.value())); //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); } - storeService.saveMessages(kafka, partition.topic(), messages);//00 + storeService.saveMessages(kafka, partition.topic(), messages); log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index 291f1cad..f5a7698d 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -22,7 +22,9 @@ package org.onap.datalake.feeder.service; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Set; import javax.annotation.PostConstruct; @@ -32,13 +34,23 @@ import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; import org.json.XML; import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.DbType; +import org.onap.datalake.feeder.domain.EffectiveTopic; import org.onap.datalake.feeder.domain.Kafka; import org.onap.datalake.feeder.dto.TopicConfig; import org.onap.datalake.feeder.enumeration.DataFormat; +import org.onap.datalake.feeder.enumeration.DbTypeEnum; +import org.onap.datalake.feeder.service.db.CouchbaseService; +import org.onap.datalake.feeder.service.db.DbStoreService; +import org.onap.datalake.feeder.service.db.ElasticsearchService; +import org.onap.datalake.feeder.service.db.HdfsService; +import org.onap.datalake.feeder.service.db.MongodbService; import org.onap.datalake.feeder.util.JsonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; import com.fasterxml.jackson.databind.ObjectMapper; @@ -61,19 +73,10 @@ public class StoreService { private ApplicationConfiguration config; @Autowired - private TopicConfigPollingService configPollingService; - - @Autowired - private MongodbService mongodbService; + private ApplicationContext context; @Autowired - private CouchbaseService couchbaseService; - - @Autowired - private ElasticsearchService elasticsearchService; - - @Autowired - private HdfsService hdfsService; + private TopicConfigPollingService configPollingService; private ObjectMapper yamlReader; @@ -87,23 +90,41 @@ public class StoreService { return; } - TopicConfig topicConfig = configPollingService.getEffectiveTopicConfig(topicStr); + Collection effectiveTopics = configPollingService.getEffectiveTopic(kafka, topicStr); + for(EffectiveTopic effectiveTopic:effectiveTopics) { + saveMessagesForTopic(effectiveTopic, messages); + } + } + + private void saveMessagesForTopic(EffectiveTopic effectiveTopic, List> messages) { + if (!effectiveTopic.getTopic().isEnabled()) { + log.error("we should not come here {}", effectiveTopic); + return; + } List docs = new ArrayList<>(); for (Pair pair : messages) { try { - docs.add(messageToJson(topicConfig, pair)); + docs.add(messageToJson(effectiveTopic, pair)); } catch (Exception e) { //may see org.json.JSONException. log.error("Error when converting this message to JSON: " + pair.getRight(), e); } } - saveJsons(topicConfig, docs, messages); + Set dbs = effectiveTopic.getTopic().getDbs(); + + for (Db db : dbs) { + if (db.getDbType().isTool() || !db.isEnabled()) { + continue; + } + DbStoreService dbStoreService = findDbStoreService(db); + dbStoreService.saveJsons(effectiveTopic, docs); + } } - private JSONObject messageToJson(TopicConfig topicConfig, Pair pair) throws IOException { + private JSONObject messageToJson(EffectiveTopic effectiveTopic, Pair pair) throws IOException { long timestamp = pair.getLeft(); String text = pair.getRight(); @@ -114,11 +135,11 @@ public class StoreService { // log.debug("{} ={}", topicStr, text); //} - boolean storeRaw = topicConfig.isSaveRaw(); + boolean storeRaw = effectiveTopic.getTopic().isSaveRaw(); JSONObject json = null; - DataFormat dataFormat = topicConfig.getDataFormat2(); + DataFormat dataFormat = effectiveTopic.getTopic().getDataFormat2(); switch (dataFormat) { case JSON: @@ -149,15 +170,15 @@ public class StoreService { json.put(config.getRawDataLabel(), text); } - if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) { - String[] paths = topicConfig.getAggregateArrayPath2(); + if (StringUtils.isNotBlank(effectiveTopic.getTopic().getAggregateArrayPath())) { + String[] paths = effectiveTopic.getTopic().getAggregateArrayPath2(); for (String path : paths) { JsonUtil.arrayAggregate(path, json); } } - if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) { - String[] paths = topicConfig.getFlattenArrayPath2(); + if (StringUtils.isNotBlank(effectiveTopic.getTopic().getFlattenArrayPath())) { + String[] paths = effectiveTopic.getTopic().getFlattenArrayPath2(); for (String path : paths) { JsonUtil.flattenArray(path, json); } @@ -166,29 +187,29 @@ public class StoreService { return json; } - private void saveJsons(TopicConfig topic, List jsons, List> messages) { - if (topic.supportMongoDB()) { - mongodbService.saveJsons(topic, jsons); - } - - if (topic.supportCouchbase()) { - couchbaseService.saveJsons(topic, jsons); - } - - if (topic.supportElasticsearch()) { - elasticsearchService.saveJsons(topic, jsons); - } - - if (topic.supportHdfs()) { - hdfsService.saveMessages(topic, messages); + private DbStoreService findDbStoreService(Db db) { + DbType dbType = db.getDbType(); + DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId()); + switch (dbTypeEnum) { + case CB: + return context.getBean(CouchbaseService.class, db); + case ES: + return context.getBean(ElasticsearchService.class, db); + case HDFS: + return context.getBean(HdfsService.class, db); + case MONGO: + return context.getBean(MongodbService.class, db); + default: + log.error("we should not come here {}", dbTypeEnum); + return null; } } public void flush() { //force flush all buffer - hdfsService.flush(); +// hdfsService.flush(); } public void flushStall() { //flush stall buffer - hdfsService.flushStall(); + // hdfsService.flushStall(); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java index 453b3ee9..8f703b1d 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java @@ -21,20 +21,23 @@ package org.onap.datalake.feeder.service; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.PostConstruct; import org.apache.commons.collections.CollectionUtils; import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.EffectiveTopic; import org.onap.datalake.feeder.domain.Kafka; -import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.repository.KafkaRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; /** @@ -52,45 +55,56 @@ public class TopicConfigPollingService implements Runnable { ApplicationConfiguration config; @Autowired - private DmaapService dmaapService; + private ApplicationContext context; - //effective TopicConfig Map - private Map effectiveTopicConfigMap = new HashMap<>(); + @Autowired + private KafkaRepository kafkaRepository; + + //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic. + private Map>> effectiveTopicMap = new HashMap<>();; + //private Map effectiveTopicConfigMap; //monitor Kafka topic list changes - private List activeTopics; - private ThreadLocal activeTopicsVersionLocal = ThreadLocal.withInitial(() -> -1); - private int currentActiveTopicsVersion = -1; + private Map> activeTopicMap; + + private ThreadLocal> activeTopicsVersionLocal = new ThreadLocal<>(); + private Map currentActiveTopicsVersionMap = new HashMap<>(); private boolean active = false; @PostConstruct private void init() { try { - log.info("init(), ccalling poll()..."); - activeTopics = poll(); - currentActiveTopicsVersion++; + log.info("init(), calling poll()..."); + activeTopicMap = poll(); } catch (Exception ex) { log.error("error connection to HDFS.", ex); } } - public boolean isActiveTopicsChanged(boolean update) {//update=true means sync local version - boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get(); - log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get()); - if (changed && update) { - activeTopicsVersionLocal.set(currentActiveTopicsVersion); + public boolean isActiveTopicsChanged(Kafka kafka) {//update=true means sync local version + String kafkaId = kafka.getId(); + int currentActiveTopicsVersion = currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1);//init did one version + int localActiveTopicsVersion = activeTopicsVersionLocal.get().getOrDefault(kafkaId, 0); + + boolean changed = currentActiveTopicsVersion > localActiveTopicsVersion; + log.debug("kafkaId={} isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", kafkaId, changed, currentActiveTopicsVersion, localActiveTopicsVersion); + if (changed) { + activeTopicsVersionLocal.get().put(kafkaId, currentActiveTopicsVersion); } return changed; } - public List getActiveTopics(Kafka kafka) { - return activeTopics; + //get a list of topic names to monitor + public Collection getActiveTopics(Kafka kafka) { + return activeTopicMap.get(kafka.getId()); } - public TopicConfig getEffectiveTopicConfig(String topicStr) { - return effectiveTopicConfigMap.get(topicStr); + //get the EffectiveTopics given kafka and topic name + public Collection getEffectiveTopic(Kafka kafka, String topicStr) { + Map> effectiveTopicMapKafka= effectiveTopicMap.get(kafka.getId()); + return effectiveTopicMapKafka.get(topicStr); } @Override @@ -100,7 +114,7 @@ public class TopicConfigPollingService implements Runnable { while (active) { try { //sleep first since we already pool in init() - Thread.sleep(config.getDmaapCheckNewTopicInterval()); + Thread.sleep(config.getCheckTopicInterval()); if(!active) { break; } @@ -110,15 +124,23 @@ public class TopicConfigPollingService implements Runnable { } try { - List newTopics = poll(); - if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) { - log.info("activeTopics list is updated, old={}", activeTopics); - log.info("activeTopics list is updated, new={}", newTopics); - - activeTopics = newTopics; - currentActiveTopicsVersion++; - } else { - log.debug("activeTopics list is not updated."); + Map> newTopicsMap = poll(); + + for(Map.Entry> entry:newTopicsMap.entrySet()) { + String kafkaId = entry.getKey(); + Set newTopics = entry.getValue(); + + Set activeTopics = activeTopicMap.get(kafkaId); + + if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) { + log.info("activeTopics list is updated, old={}", activeTopics); + log.info("activeTopics list is updated, new={}", newTopics); + + activeTopicMap.put(kafkaId, newTopics); + currentActiveTopicsVersionMap.put(kafkaId, currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1)+1); + } else { + log.debug("activeTopics list is not updated."); + } } } catch (IOException e) { log.error("dmaapService.getActiveTopics()", e); @@ -132,17 +154,27 @@ public class TopicConfigPollingService implements Runnable { active = false; } - private List poll() throws IOException { + private Map> poll() throws IOException { + Map> ret = new HashMap<>(); + Iterable kafkas = kafkaRepository.findAll(); + for (Kafka kafka : kafkas) { + if (kafka.isEnabled()) { + Set topics = poll(kafka); + ret.put(kafka.getId(), topics); + } + } + return ret; + } + + private Set poll(Kafka kafka) throws IOException { log.debug("poll(), use dmaapService to getActiveTopicConfigs..."); - List activeTopicConfigs = dmaapService.getActiveTopicConfigs(); - Map tempEffectiveTopicConfigMap = new HashMap<>(); - activeTopicConfigs.stream().forEach(topicConfig -> tempEffectiveTopicConfigMap.put(topicConfig.getName(), topicConfig)); - effectiveTopicConfigMap = tempEffectiveTopicConfigMap; - log.debug("poll(), effectiveTopicConfigMap={}", effectiveTopicConfigMap); + DmaapService dmaapService = context.getBean(DmaapService.class, kafka); + + Map> activeEffectiveTopics = dmaapService.getActiveEffectiveTopic(); + effectiveTopicMap.put(kafka.getId(), activeEffectiveTopics); - List ret = new ArrayList<>(activeTopicConfigs.size()); - activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName())); + Set ret = activeEffectiveTopics.keySet(); return ret; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index dd8664e8..86b27a9a 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -21,23 +21,31 @@ package org.onap.datalake.feeder.service; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.Set; +import org.apache.commons.collections.CollectionUtils; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.dto.TopicConfig; import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Kafka; import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.repository.DbRepository; +import org.onap.datalake.feeder.repository.TopicNameRepository; import org.onap.datalake.feeder.repository.TopicRepository; +import org.onap.datalake.feeder.service.db.ElasticsearchService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; /** - * Service for topics + * Service for topics * * @author Guobiao Mo * @@ -49,72 +57,90 @@ public class TopicService { @Autowired private ApplicationConfiguration config; - + @Autowired - private TopicRepository topicRepository; + private ApplicationContext context; @Autowired - private ElasticsearchService elasticsearchService; + private TopicNameRepository topicNameRepository; + @Autowired + private TopicRepository topicRepository; @Autowired private DbRepository dbRepository; - public TopicConfig getEffectiveTopic(String topicStr) { - try { - return getEffectiveTopic(topicStr, false); - } catch (IOException e) { - log.error(topicStr, e); + public List getEnabledEffectiveTopic(Kafka kafka, String topicStr, boolean ensureTableExist) throws IOException { + + List topics = findTopics(kafka, topicStr); + if (CollectionUtils.isEmpty(topics)) { + topics = new ArrayList<>(); + topics.add(getDefaultTopic(kafka)); } - return null; - } - public TopicConfig getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException { - Topic topic = getTopic(topicStr); - if (topic == null) { - topic = getDefaultTopic(); + List ret = new ArrayList<>(); + for (Topic topic : topics) { + if (!topic.isEnabled()) { + continue; + } + ret.add(new EffectiveTopic(topic, topicStr)); + + if (ensureTableExist) { + for (Db db : topic.getDbs()) { + if (db.isElasticsearch()) { + ElasticsearchService elasticsearchService = context.getBean(ElasticsearchService.class, db); + elasticsearchService.ensureTableExist(topicStr); + } + } + } } - TopicConfig topicConfig = topic.getTopicConfig(); - topicConfig.setName(topicStr);//need to change name if it comes from DefaultTopic + + return ret; + } + + //TODO use query + public List findTopics(Kafka kafka, String topicStr) { + List ret = new ArrayList<>(); - if(ensureTableExist && topicConfig.isEnabled() && topicConfig.supportElasticsearch()) { - elasticsearchService.ensureTableExist(topicStr); + Iterable allTopics = topicRepository.findAll(); + for(Topic topic: allTopics) { + if(topic.getKafkas().contains(kafka ) && topic.getTopicName().getId().equals(topicStr)){ + ret.add(topic); + } } - return topicConfig; + return ret; } - public Topic getTopic(String topicStr) { - Optional ret = topicRepository.findById(null);//FIXME + public Topic getTopic(int topicId) { + Optional ret = topicRepository.findById(topicId); return ret.isPresent() ? ret.get() : null; } - public Topic getDefaultTopic() { - return getTopic(config.getDefaultTopicName()); + public Topic getDefaultTopic(Kafka kafka) { + return findTopics(kafka, config.getDefaultTopicName()).get(0); } - public boolean istDefaultTopic(Topic topic) { + public boolean isDefaultTopic(Topic topic) { if (topic == null) { return false; } - return true;//topic.getName().equals(config.getDefaultTopicName()); + return topic.getName().equals(config.getDefaultTopicName()); } - public void fillTopicConfiguration(TopicConfig tConfig, Topic wTopic) - { + public void fillTopicConfiguration(TopicConfig tConfig, Topic wTopic) { fillTopic(tConfig, wTopic); } - public Topic fillTopicConfiguration(TopicConfig tConfig) - { + public Topic fillTopicConfiguration(TopicConfig tConfig) { Topic topic = new Topic(); fillTopic(tConfig, topic); return topic; } - private void fillTopic(TopicConfig tConfig, Topic topic) - { + private void fillTopic(TopicConfig tConfig, Topic topic) { Set relateDb = new HashSet<>(); - //topic.setName(tConfig.getName()); + topic.setId(tConfig.getId()); + topic.setTopicName(topicNameRepository.findById(tConfig.getName()).get()); topic.setLogin(tConfig.getLogin()); topic.setPass(tConfig.getPassword()); topic.setEnabled(tConfig.isEnabled()); @@ -126,24 +152,21 @@ public class TopicService { topic.setAggregateArrayPath(tConfig.getAggregateArrayPath()); topic.setFlattenArrayPath(tConfig.getFlattenArrayPath()); - if(tConfig.getSinkdbs() != null) { + if (tConfig.getSinkdbs() != null) { for (String item : tConfig.getSinkdbs()) { Db sinkdb = dbRepository.findByName(item); if (sinkdb != null) { relateDb.add(sinkdb); } } - if(relateDb.size() > 0) + if (relateDb.size() > 0) topic.setDbs(relateDb); - else if(relateDb.size() == 0) - { + else if (relateDb.size() == 0) { topic.getDbs().clear(); } - }else - { + } else { topic.setDbs(relateDb); } - } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java new file mode 100644 index 00000000..33c8847e --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java @@ -0,0 +1,180 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service.db; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.json.JSONObject; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; +import com.couchbase.client.java.CouchbaseCluster; +import com.couchbase.client.java.document.JsonDocument; +import com.couchbase.client.java.document.json.JsonObject; +import com.couchbase.client.java.env.CouchbaseEnvironment; +import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; +import com.couchbase.client.java.error.DocumentAlreadyExistsException; + +import rx.Observable; +import rx.functions.Func1; + +/** + * Service to use Couchbase + * + * @author Guobiao Mo + * + */ +@Service +public class CouchbaseService implements DbStoreService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + ApplicationConfiguration config; + + private Db couchbase; +/* + @Autowired + private DbService dbService; + + private boolean isReady = false; +*/ + Bucket bucket; + + public CouchbaseService( ) { + + } + public CouchbaseService(Db db) { + couchbase = db; + } + + @PostConstruct + private void init() { + // Initialize Couchbase Connection + try { + //this tunes the SDK (to customize connection timeout) + CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder().connectTimeout(60000) // 60s, default is 5s + .build(); + Cluster cluster = CouchbaseCluster.create(env, couchbase.getHost()); + cluster.authenticate(couchbase.getLogin(), couchbase.getPass()); + bucket = cluster.openBucket(couchbase.getDatabase()); + // Create a N1QL Primary Index (but ignore if it exists) + bucket.bucketManager().createN1qlPrimaryIndex(true, false); + + log.info("Connected to Couchbase {} as {}", couchbase.getHost(), couchbase.getLogin()); +// isReady = true; + } catch (Exception ex) { + log.error("error connection to Couchbase.", ex); + // isReady = false; + } + } + + @PreDestroy + public void cleanUp() { + config.getShutdownLock().readLock().lock(); + + try { + log.info("bucket.close() at cleanUp."); + bucket.close(); + } finally { + config.getShutdownLock().readLock().unlock(); + } + } + + @Override + public void saveJsons(EffectiveTopic effectiveTopic, List jsons) { + List documents = new ArrayList<>(jsons.size()); + for (JSONObject json : jsons) { + //convert to Couchbase JsonObject from org.json JSONObject + JsonObject jsonObject = JsonObject.fromJson(json.toString()); + + long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson() + + //setup TTL + int expiry = (int) (timestamp / 1000L) + effectiveTopic.getTopic().getTtl() * 3600 * 24; //in second + + String id = getId(effectiveTopic.getTopic(), json); + JsonDocument doc = JsonDocument.create(id, expiry, jsonObject); + documents.add(doc); + } + try { + saveDocuments(documents); + } catch (DocumentAlreadyExistsException e) { + log.error("Some or all the following ids are duplicate."); + for(JsonDocument document : documents) { + log.error("saveJsons() DocumentAlreadyExistsException {}", document.id()); + } + } catch (rx.exceptions.CompositeException e) { + List causes = e.getExceptions(); + for(Throwable cause : causes) { + log.error("saveJsons() CompositeException cause {}", cause.getMessage()); + } + } catch (Exception e) { + log.error("error saving to Couchbase.", e); + } + log.debug("saved text to topic = {}, this batch count = {} ", effectiveTopic, documents.size()); + } + + public String getId(Topic topic, JSONObject json) { + //if this topic requires extract id from JSON + String id = topic.getMessageId(json); + if (id != null) { + return id; + } + + String topicStr = topic.getName(); + id = topicStr+":"+UUID.randomUUID(); + + //https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2 + //atomically get the next sequence number: + // increment by 1, initialize at 0 if counter doc not found + //TODO how slow is this compared with above UUID approach? + //sometimes this gives java.util.concurrent.TimeoutException + //JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 + //id = topicStr + ":" + nextIdNumber.content(); + + return id; + } + + //https://docs.couchbase.com/java-sdk/2.7/document-operations.html + private void saveDocuments(List documents) { + Observable.from(documents).flatMap(new Func1>() { + @Override + public Observable call(final JsonDocument docToInsert) { + return bucket.async().insert(docToInsert); + } + }).last().toBlocking().single(); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java new file mode 100644 index 00000000..5ea6e23e --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/DbStoreService.java @@ -0,0 +1,37 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2018 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service.db; + +import java.util.List; + +import org.json.JSONObject; +import org.onap.datalake.feeder.domain.EffectiveTopic; + +/** + * Interface for all db store services + * + * @author Guobiao Mo + * + */ +public interface DbStoreService { + + void saveJsons(EffectiveTopic topic, List jsons); +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java new file mode 100644 index 00000000..aee63ed7 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java @@ -0,0 +1,258 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service.db; + +import java.io.IOException; +import java.util.List; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.RestStatus; +import org.json.JSONObject; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.domain.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Elasticsearch Service for table creation, data submission, as well as data pre-processing. + * + * @author Guobiao Mo + * + */ +@Service +public class ElasticsearchService implements DbStoreService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private Db elasticsearch; + + @Autowired + private ApplicationConfiguration config; + + //@Autowired +// private DbService dbService; + + private RestHighLevelClient client; + ActionListener listener; + + public ElasticsearchService( ) { + + } + public ElasticsearchService(Db db) { + elasticsearch = db; + } + + //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication + //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html + @PostConstruct + private void init() { + //Db elasticsearch = dbService.getElasticsearch(); + String elasticsearchHost = elasticsearch.getHost(); + + // Initialize the Connection + client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http"))); + + log.info("Connected to Elasticsearch Host {}", elasticsearchHost); + + listener = new ActionListener() { + @Override + public void onResponse(BulkResponse bulkResponse) { + if(bulkResponse.hasFailures()) { + log.debug(bulkResponse.buildFailureMessage()); + } + } + + @Override + public void onFailure(Exception e) { + log.error(e.getMessage()); + } + }; + } + + @PreDestroy + public void cleanUp() throws IOException { + config.getShutdownLock().readLock().lock(); + + try { + log.info("cleanUp() closing Elasticsearch client."); + client.close(); + } catch (IOException e) { + log.error("client.close() at cleanUp.", e); + } finally { + config.getShutdownLock().readLock().unlock(); + } + } + + public void ensureTableExist(String topic) throws IOException { + String topicLower = topic.toLowerCase(); + + GetIndexRequest request = new GetIndexRequest(topicLower); + + boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); + if (!exists) { + //TODO submit mapping template + CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); + CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); + log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged()); + } + } + + //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME + @Override + public void saveJsons(EffectiveTopic effectiveTopic, List jsons) { + + BulkRequest request = new BulkRequest(); + + for (JSONObject json : jsons) { + if (effectiveTopic.getTopic().isCorrelateClearedMessage()) { + boolean found = correlateClearedMessage(effectiveTopic.getTopic(), json); + if (found) { + continue; + } + } + + String id = effectiveTopic.getTopic().getMessageId(json); //id can be null + + request.add(new IndexRequest(effectiveTopic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON)); + } + + log.debug("saving text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size()); + + if (config.isAsync()) { + client.bulkAsync(request, RequestOptions.DEFAULT, listener); + } else { + try { + BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); + if(bulkResponse.hasFailures()) { + log.debug(bulkResponse.buildFailureMessage()); + } + } catch (IOException e) { + log.error(effectiveTopic.getName(), e); + } + } + + } + + /** + * + * @param topic + * @param json + * @return boolean + * + * Because of query by id, The search API cannot be used for query. The + * search API can only query all data or based on the fields in the + * source. So use the get API, three parameters: index, type, document + * id + */ + private boolean correlateClearedMessage(Topic topic, JSONObject json) { + boolean found = false; + String eName = null; + + try { + eName = json.query("/event/commonEventHeader/eventName").toString(); + + if (StringUtils.isNotBlank(eName) && eName.endsWith("Cleared")) { + + String name = eName.substring(0, eName.length() - 7); + String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString(); + String specificProblem = json.query("/event/faultFields/specificProblem").toString(); + + String id = String.join("^", name, reportingEntityName, specificProblem);//example: id = "aaaa^cccc^bbbbb" + String index = topic.getName().toLowerCase(); + + //get + GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id); + + GetResponse getResponse = null; + try { + getResponse = client.get(getRequest, RequestOptions.DEFAULT); + if (getResponse != null) { + + if (getResponse.isExists()) { + String sourceAsString = getResponse.getSourceAsString(); + JSONObject jsonObject = new JSONObject(sourceAsString); + jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed"); + String jsonString = jsonObject.toString(); + + //update + IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id); + request.source(jsonString, XContentType.JSON); + IndexResponse indexResponse = null; + try { + indexResponse = client.index(request, RequestOptions.DEFAULT); + found = true; + } catch (IOException e) { + log.error("save failure"); + } + } else { + log.error("The getResponse was not exists"); + } + + } else { + log.error("The document for this id was not found"); + } + + } catch (ElasticsearchException e) { + if (e.status() == RestStatus.NOT_FOUND) { + log.error("The document for this id was not found"); + } + if (e.status() == RestStatus.CONFLICT) { + log.error("Version conflict"); + } + log.error("Get document exception", e); + } catch (IOException e) { + log.error(topic.getName(), e); + } + + } + + } catch (Exception e) { + log.error("error", e); + } + + return found; + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java new file mode 100644 index 00000000..0e107fdf --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java @@ -0,0 +1,249 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service.db; + +import java.io.IOException; +import java.net.InetAddress; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ShutdownHookManager; +import org.json.JSONObject; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.onap.datalake.feeder.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import lombok.Getter; +import lombok.Setter; + +/** + * Service to write data to HDFS + * + * @author Guobiao Mo + * + */ +@Service +public class HdfsService implements DbStoreService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private Db hdfs; + + @Autowired + ApplicationConfiguration config; + + FileSystem fileSystem; + private boolean isReady = false; + + private ThreadLocal> bufferLocal = ThreadLocal.withInitial(HashMap::new); + private ThreadLocal dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd")); + private ThreadLocal timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS")); + + @Setter + @Getter + private class Buffer { + long lastFlush; + List data; + + public Buffer() { + lastFlush = Long.MIN_VALUE; + data = new ArrayList<>(); + } + + public void flush(String topic) { + try { + if (!data.isEmpty()) { + saveMessages(topic, data); + data.clear(); + lastFlush = System.currentTimeMillis(); + } + } catch (IOException e) { + log.error("{} error saving to HDFS. {}", topic, e.getMessage()); + } + } + + public void flushStall(String topic) { + if (!data.isEmpty() && Util.isStall(lastFlush, config.getHdfsFlushInterval())) { + log.debug("going to flushStall topic={}, buffer size={}", topic, data.size()); + flush(topic); + } + } + + public void addData(List> messages) { + if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer + lastFlush = System.currentTimeMillis(); + } + + messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used + } + + public void addData2(List messages) { + if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer + lastFlush = System.currentTimeMillis(); + } + + messages.stream().forEach(message -> data.add(message.toString())); + } + + private void saveMessages(String topic, List bufferList) throws IOException { + + long thread = Thread.currentThread().getId(); + Date date = new Date(); + String day = dayFormat.get().format(date); + String time = timeFormat.get().format(date); + + InetAddress inetAddress = InetAddress.getLocalHost(); + String hostName = inetAddress.getHostName(); + + String filePath = String.format("/datalake/%s/%s/%s-%s-%s", topic, day, time, hostName, thread); + Path path = new Path(filePath); + log.debug("writing {} to HDFS {}", bufferList.size(), filePath); + + // Create a new file and write data to it. + FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize()); + + bufferList.stream().forEach(message -> { + try { + out.writeUTF(message); + out.write('\n'); + } catch (IOException e) { + log.error("error writing to HDFS. {}", e.getMessage()); + } + }); + + out.close(); + log.debug("Done writing {} to HDFS {}", bufferList.size(), filePath); + } + } + + public HdfsService( ) { + } + + public HdfsService(Db db) { + hdfs = db; + } + + @PostConstruct + private void init() { + // Initialize HDFS Connection + try { + //Get configuration of Hadoop system + Configuration hdfsConfig = new Configuration(); + + int port = hdfs.getPort() == null ? 8020 : hdfs.getPort(); + + String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port); + hdfsConfig.set("fs.defaultFS", hdfsuri); + System.setProperty("HADOOP_USER_NAME", hdfs.getLogin()); + + log.info("Connecting to -- {} as {}", hdfsuri, hdfs.getLogin()); + + fileSystem = FileSystem.get(hdfsConfig); + + //disable Hadoop Shutdown Hook, we need the HDFS connection to flush data + ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get(); + hadoopShutdownHookManager.clearShutdownHooks(); + + isReady = true; + } catch (Exception ex) { + log.error("error connection to HDFS.", ex); + isReady = false; + } + } + + @PreDestroy + public void cleanUp() { + config.getShutdownLock().readLock().lock(); + + try { + log.info("fileSystem.close() at cleanUp."); + flush(); + fileSystem.close(); + } catch (IOException e) { + log.error("fileSystem.close() at cleanUp.", e); + } finally { + config.getShutdownLock().readLock().unlock(); + } + } + + public void flush() { + log.info("Force flush ALL data, regardless of stall"); + bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic)); + } + + //if no new data comes in for a topic for a while, need to flush its buffer + public void flushStall() { + log.debug("Flush stall data"); + bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic)); + } + + //used if raw data should be saved + public void saveMessages(EffectiveTopic topic, List> messages) { + String topicStr = topic.getName(); + + Map bufferMap = bufferLocal.get(); + final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer()); + + buffer.addData(messages); + + if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) { + buffer.flush(topicStr); + } else { + log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize()); + } + } + + @Override + public void saveJsons(EffectiveTopic topic, List jsons) { + String topicStr = topic.getName(); + + Map bufferMap = bufferLocal.get(); + final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer()); + + buffer.addData2(jsons); + + if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) { + buffer.flush(topicStr); + } else { + log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize()); + } + + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java new file mode 100644 index 00000000..0f522f6b --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java @@ -0,0 +1,180 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2018 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service.db; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.apache.commons.lang3.StringUtils; +import org.bson.Document; + +import org.json.JSONObject; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.EffectiveTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.mongodb.bulk.BulkWriteError; +import com.mongodb.MongoBulkWriteException; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoClientOptions.Builder; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.InsertManyOptions; + +/** + * Service for using MongoDB + * + * @author Guobiao Mo + * + */ +@Service +public class MongodbService implements DbStoreService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private Db mongodb; + + @Autowired + private ApplicationConfiguration config; + private boolean dbReady = false; + + //@Autowired +// private DbService dbService; + + private MongoDatabase database; + private MongoClient mongoClient; + private Map> mongoCollectionMap = new HashMap<>(); + private InsertManyOptions insertManyOptions; + + public MongodbService( ) { + } + public MongodbService(Db db) { + mongodb = db; + } + + @PostConstruct + private void init() { + String host = mongodb.getHost(); + + Integer port = mongodb.getPort(); + if (port == null || port == 0) { + port = 27017; //MongoDB default + } + + String databaseName = mongodb.getDatabase(); + String userName = mongodb.getLogin(); + String password = mongodb.getPass(); + + MongoCredential credential = null; + if (StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password)) { + credential = MongoCredential.createCredential(userName, databaseName, password.toCharArray()); + } + + Builder builder = MongoClientOptions.builder(); + builder.serverSelectionTimeout(30000);//server selection timeout, in milliseconds + + //http://mongodb.github.io/mongo-java-driver/3.0/driver/reference/connecting/ssl/ + if (config.isEnableSSL()) { + builder.sslEnabled(Boolean.TRUE.equals(mongodb.getEncrypt()));// getEncrypt() can be null + } + MongoClientOptions options = builder.build(); + List addrs = new ArrayList(); + + addrs.add(new ServerAddress(host, port)); // FIXME should be a list of address + + try { + if (StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password)) { + credential = MongoCredential.createCredential(userName, databaseName, password.toCharArray()); + List credentialList = new ArrayList(); + credentialList.add(credential); + mongoClient = new MongoClient(addrs, credentialList, options); + } else { + mongoClient = new MongoClient(addrs, options); + } + } catch (Exception ex) { + dbReady = false; + log.error("Fail to initiate MongoDB" + mongodb.getHost()); + return; + } + database = mongoClient.getDatabase(mongodb.getDatabase()); + + insertManyOptions = new InsertManyOptions(); + insertManyOptions.ordered(false); + + dbReady = true; + } + + @PreDestroy + public void cleanUp() { + config.getShutdownLock().readLock().lock(); + + try { + log.info("mongoClient.close() at cleanUp."); + mongoClient.close(); + } finally { + config.getShutdownLock().readLock().unlock(); + } + } + + public void saveJsons(EffectiveTopic effectiveTopic, List jsons) { + if (dbReady == false)//TOD throw exception + return; + List documents = new ArrayList<>(jsons.size()); + for (JSONObject json : jsons) { + //convert org.json JSONObject to MongoDB Document + Document doc = Document.parse(json.toString()); + + String id = effectiveTopic.getTopic().getMessageId(json); //id can be null + if (id != null) { + doc.put("_id", id); + } + documents.add(doc); + } + + String collectionName = effectiveTopic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ . + MongoCollection collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k)); + + try { + collection.insertMany(documents, insertManyOptions); + } catch (MongoBulkWriteException e) { + List bulkWriteErrors = e.getWriteErrors(); + for (BulkWriteError bulkWriteError : bulkWriteErrors) { + log.error("Failed record: {}", bulkWriteError); + } + } + + log.debug("saved text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size()); + } + +} -- cgit 1.2.3-korg