From e5c87e6ddfc5e727b1b4d11e994ad242fd366534 Mon Sep 17 00:00:00 2001 From: Guobiao Mo Date: Wed, 10 Jul 2019 01:34:20 -0700 Subject: supports multiple Kafka clusters and DBs Change kafka table id to int Issue-ID: DCAEGEN2-1631 Change-Id: Ib5109b75a387f76709dc38161bf2d7ef084950ef Signed-off-by: Guobiao Mo --- .../feeder/controller/KafkaController.java | 4 +-- .../feeder/controller/TopicController.java | 2 +- .../org/onap/datalake/feeder/domain/Kafka.java | 11 ++++--- .../org/onap/datalake/feeder/domain/Topic.java | 4 +-- .../org/onap/datalake/feeder/dto/KafkaConfig.java | 4 +-- .../org/onap/datalake/feeder/dto/TopicConfig.java | 6 +--- .../datalake/feeder/enumeration/DbTypeEnum.java | 21 +++++++++++-- .../feeder/repository/KafkaRepository.java | 2 +- .../onap/datalake/feeder/service/DbService.java | 36 +++++++--------------- .../onap/datalake/feeder/service/KafkaService.java | 2 +- .../org/onap/datalake/feeder/service/Puller.java | 3 +- .../feeder/service/TopicConfigPollingService.java | 24 +++++++-------- .../onap/datalake/feeder/service/TopicService.java | 4 +-- .../feeder/service/db/CouchbaseService.java | 1 + .../feeder/service/db/ElasticsearchService.java | 2 +- .../datalake/feeder/service/db/MongodbService.java | 4 +-- 16 files changed, 65 insertions(+), 65 deletions(-) (limited to 'components/datalake-handler/feeder/src/main/java/org') diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java index 901adf29..8d1bf316 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java @@ -87,7 +87,7 @@ public class KafkaController { @PutMapping("/{id}") @ResponseBody @ApiOperation(value="Update a kafka.") - public PostReturnBody updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable String id, HttpServletResponse response) throws IOException { + public PostReturnBody updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable int id, HttpServletResponse response) throws IOException { if (result.hasErrors()) { sendError(response, 400, "Error parsing KafkaConfig : "+result.toString()); @@ -116,7 +116,7 @@ public class KafkaController { @DeleteMapping("/{id}") @ResponseBody @ApiOperation(value="delete a kafka.") - public void deleteKafka(@PathVariable("id") String id, HttpServletResponse response) throws IOException{ + public void deleteKafka(@PathVariable("id") int id, HttpServletResponse response) throws IOException{ Kafka oldKafka = kafkaService.getKafkaById(id); if (oldKafka == null) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java index 1162aedd..bb0de4b0 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java @@ -90,7 +90,7 @@ public class TopicController { @GetMapping("/dmaap/{kafkaId}") @ResponseBody @ApiOperation(value = "List all topic names in DMaaP.") - public List listDmaapTopics(@PathVariable("kafkaId") String kafkaId ) { + public List listDmaapTopics(@PathVariable("kafkaId") int kafkaId ) { Kafka kafka = kafkaRepository.findById(kafkaId).get(); DmaapService dmaapService = context.getBean(DmaapService.class, kafka); return dmaapService.getTopics(); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java index 2741c638..7f7b59e8 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java @@ -24,6 +24,8 @@ import java.util.Set; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.FetchType; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.JoinTable; @@ -47,8 +49,9 @@ import org.onap.datalake.feeder.dto.KafkaConfig; @Table(name = "kafka") public class Kafka { @Id - @Column(name="`id`") - private String id; + @GeneratedValue(strategy = GenerationType.IDENTITY) + @Column(name = "`id`") + private int id; @Column(name="`name`", nullable = false) private String name; @@ -117,12 +120,12 @@ public class Kafka { if (this.getClass() != obj.getClass()) return false; - return id.equals(((Kafka) obj).getId()); + return id == ((Kafka) obj).getId(); } @Override public int hashCode() { - return id.hashCode(); + return id; } public KafkaConfig getKafkaConfig() { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index 13e0163e..5d0c7625 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -213,13 +213,13 @@ public class Topic { tConfig.setEnabledSinkdbs(enabledDbList); Set topicKafka = getKafkas(); - List kafkaList = new ArrayList<>(); + List kafkaList = new ArrayList<>(); if (topicKafka != null) { for (Kafka kafka : topicKafka) { kafkaList.add(kafka.getId()); } } - tConfig.setSinkKafkas(kafkaList); + tConfig.setKafkas(kafkaList); return tConfig; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java index b158d167..f5e9539c 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java @@ -23,8 +23,6 @@ package org.onap.datalake.feeder.dto; import lombok.Getter; import lombok.Setter; -import java.util.List; - /** * JSON request body for Kafka Config. * @@ -35,7 +33,7 @@ import java.util.List; @Setter public class KafkaConfig { - private String id; + private int id; private String name; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java index a51103b7..6a262ca8 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java @@ -25,10 +25,6 @@ import lombok.Setter; import java.util.List; -import org.apache.commons.lang3.StringUtils; -import org.json.JSONObject; -import org.onap.datalake.feeder.enumeration.DataFormat; - /** * JSON request body for Topic manipulation. * @@ -55,7 +51,7 @@ public class TopicConfig { private String messageIdPath; private String aggregateArrayPath; private String flattenArrayPath; - private List sinkKafkas; + private List kafkas; @Override public String toString() { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java index 05d76d55..39d02d36 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java @@ -19,6 +19,12 @@ */ package org.onap.datalake.feeder.enumeration; +import org.onap.datalake.feeder.service.db.CouchbaseService; +import org.onap.datalake.feeder.service.db.DbStoreService; +import org.onap.datalake.feeder.service.db.ElasticsearchService; +import org.onap.datalake.feeder.service.db.HdfsService; +import org.onap.datalake.feeder.service.db.MongodbService; + /** * Database type * @@ -26,12 +32,23 @@ package org.onap.datalake.feeder.enumeration; * */ public enum DbTypeEnum { - CB("Couchbase"), DRUID("Druid"), ES("Elasticsearch"), HDFS("HDFS"), MONGO("MongoDB"), KIBANA("Kibana"), SUPERSET("Superset"); + CB("Couchbase", CouchbaseService.class) + , DRUID("Druid", null) + , ES("Elasticsearch", ElasticsearchService.class) + , HDFS("HDFS", HdfsService.class) + , MONGO("MongoDB", MongodbService.class) + , KIBANA("Kibana", null) + , SUPERSET("Superset", null); private final String name; + private final Class serviceClass; - DbTypeEnum(String name) { + DbTypeEnum(String name, Class serviceClass) { this.name = name; + this.serviceClass = serviceClass; } + public Class getServiceClass(){ + return serviceClass; + } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java index 8e78e5c2..6ce23ba7 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java @@ -30,6 +30,6 @@ import org.springframework.data.repository.CrudRepository; * */ -public interface KafkaRepository extends CrudRepository { +public interface KafkaRepository extends CrudRepository { } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java index addd0606..d54bf3f4 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java @@ -26,11 +26,7 @@ import java.util.Map; import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.domain.DbType; import org.onap.datalake.feeder.enumeration.DbTypeEnum; -import org.onap.datalake.feeder.service.db.CouchbaseService; import org.onap.datalake.feeder.service.db.DbStoreService; -import org.onap.datalake.feeder.service.db.ElasticsearchService; -import org.onap.datalake.feeder.service.db.HdfsService; -import org.onap.datalake.feeder.service.db.MongodbService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -53,34 +49,24 @@ public class DbService { private Map dbStoreServiceMap = new HashMap<>(); public DbStoreService findDbStoreService(Db db) { - DbStoreService ret = dbStoreServiceMap.get(db.getId()); - if (ret != null) { - return ret; + int dbId = db.getId(); + if (dbStoreServiceMap.containsKey(dbId)) { + return dbStoreServiceMap.get(dbId); } DbType dbType = db.getDbType(); DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId()); - switch (dbTypeEnum) { - case CB: - ret = context.getBean(CouchbaseService.class, db); - break; - case ES: - ret = context.getBean(ElasticsearchService.class, db); - break; - case HDFS: - ret = context.getBean(HdfsService.class, db); - break; - case MONGO: - ret = context.getBean(MongodbService.class, db); - break; - default: + Class serviceClass = dbTypeEnum.getServiceClass(); + + if (serviceClass == null) { log.error("Should not have come here {}", db); - ret = null; + dbStoreServiceMap.put(dbId, null); + return null; } - - dbStoreServiceMap.put(db.getId(), ret); + + DbStoreService ret = context.getBean(serviceClass, db); + dbStoreServiceMap.put(dbId, ret); return ret; } - } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java index 58ee9087..2e959fa2 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java @@ -39,7 +39,7 @@ public class KafkaService { @Autowired private KafkaRepository kafkaRepository; - public Kafka getKafkaById(String id) { + public Kafka getKafkaById(int id) { Optional ret = kafkaRepository.findById(id); return ret.isPresent() ? ret.get() : null; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java index 151ea3d6..ab99ad09 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java @@ -68,6 +68,7 @@ public class Puller implements Runnable { private final Logger log = LoggerFactory.getLogger(this.getClass()); + //KafkaConsumer is not thread-safe. private ThreadLocal> consumerLocal = new ThreadLocal<>(); // is key-value type, in our case key is empty, value is JSON text private boolean active = false; @@ -156,7 +157,7 @@ public class Puller implements Runnable { storeService.saveMessages(kafka, partition.topic(), messages); log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB - if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit + if (!async) {//for reliability, sync commit offset to Kafka right after saving the data to data store, this slows down a bit long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java index 6ca8c139..a02cd6a2 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java @@ -61,15 +61,15 @@ public class TopicConfigPollingService implements Runnable { private KafkaRepository kafkaRepository; //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic. - private Map>> effectiveTopicMap = new HashMap<>(); + private Map>> effectiveTopicMap = new HashMap<>(); //private Map effectiveTopicConfigMap; - //monitor Kafka topic list changes - private Map> activeTopicMap; + //monitor Kafka topic list changes, key is kafka id, value is active Topics + private Map> activeTopicMap; - private ThreadLocal> activeTopicsVersionLocal = ThreadLocal.withInitial(HashMap::new);//topic name:version - private Map currentActiveTopicsVersionMap = new HashMap<>();//topic name:version - private Map dmaapServiceMap = new HashMap<>();//kafka id:DmaapService + private ThreadLocal> activeTopicsVersionLocal = ThreadLocal.withInitial(HashMap::new);//kafkaId:version - local 'old' version + private Map currentActiveTopicsVersionMap = new HashMap<>();//kafkaId:version - current/latest version + private Map dmaapServiceMap = new HashMap<>();//kafka id:DmaapService private boolean active = false; @@ -84,7 +84,7 @@ public class TopicConfigPollingService implements Runnable { } public boolean isActiveTopicsChanged(Kafka kafka) {//update=true means sync local version - String kafkaId = kafka.getId(); + int kafkaId = kafka.getId(); int currentActiveTopicsVersion = currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1);//init did one version int localActiveTopicsVersion = activeTopicsVersionLocal.get().getOrDefault(kafkaId, 0); @@ -125,10 +125,10 @@ public class TopicConfigPollingService implements Runnable { } try { - Map> newTopicsMap = poll(); + Map> newTopicsMap = poll(); - for(Map.Entry> entry:newTopicsMap.entrySet()) { - String kafkaId = entry.getKey(); + for(Map.Entry> entry:newTopicsMap.entrySet()) { + Integer kafkaId = entry.getKey(); Set newTopics = entry.getValue(); Set activeTopics = activeTopicMap.get(kafkaId); @@ -155,8 +155,8 @@ public class TopicConfigPollingService implements Runnable { active = false; } - private Map> poll() throws IOException { - Map> ret = new HashMap<>(); + private Map> poll() throws IOException { + Map> ret = new HashMap<>(); Iterable kafkas = kafkaRepository.findAll(); for (Kafka kafka : kafkas) { if (kafka.isEnabled()) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index 73f6293f..ed9b5c20 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -172,8 +172,8 @@ public class TopicService { } Set relateKafka = new HashSet<>(); - if (tConfig.getSinkKafkas() != null) { - for (String item : tConfig.getSinkKafkas()) { + if (tConfig.getKafkas() != null) { + for (int item : tConfig.getKafkas()) { Optional sinkKafka = kafkaRepository.findById(item); if (sinkKafka.isPresent()) { relateKafka.add(sinkKafka.get()); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java index f2ac5e94..44b940a2 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java @@ -66,6 +66,7 @@ public class CouchbaseService implements DbStoreService { ApplicationConfiguration config; private Db couchbase; + //Bucket is thread-safe. https://docs.couchbase.com/java-sdk/current/managing-connections.html Bucket bucket; public CouchbaseService(Db db) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java index 18b7e2fb..e303fa9b 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java @@ -73,7 +73,7 @@ public class ElasticsearchService implements DbStoreService { @Autowired private ApplicationConfiguration config; - private RestHighLevelClient client; + private RestHighLevelClient client;//thread safe ActionListener listener; public ElasticsearchService(Db db) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java index a044790e..8677c6f6 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java @@ -72,11 +72,9 @@ public class MongodbService implements DbStoreService { private ApplicationConfiguration config; private boolean dbReady = false; - //@Autowired -// private DbService dbService; - private MongoDatabase database; private MongoClient mongoClient; + //MongoCollection is ThreadSafe private Map> mongoCollectionMap = new HashMap<>(); private InsertManyOptions insertManyOptions; -- cgit 1.2.3-korg