diff options
author | Guobiao Mo <guobiaomo@chinamobile.com> | 2019-07-10 01:34:20 -0700 |
---|---|---|
committer | Guobiao Mo <guobiaomo@chinamobile.com> | 2019-07-10 01:34:20 -0700 |
commit | e5c87e6ddfc5e727b1b4d11e994ad242fd366534 (patch) | |
tree | 5802f76f19549f08a7513e7e2673cfdd3177ce81 /components/datalake-handler/feeder/src | |
parent | 7a094862a6a679b022bf777d3f15e32a048ba4d8 (diff) |
supports multiple Kafka clusters and DBs
Change kafka table id to int
Issue-ID: DCAEGEN2-1631
Change-Id: Ib5109b75a387f76709dc38161bf2d7ef084950ef
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src')
21 files changed, 76 insertions, 74 deletions
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql index 02f2343c..1fd9aa84 100644 --- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql +++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql @@ -32,7 +32,7 @@ CREATE TABLE `db` ( PRIMARY KEY (`id`),
KEY `FK3njadtw43ieph7ftt4kxdhcko` (`db_type_id`),
CONSTRAINT `FK3njadtw43ieph7ftt4kxdhcko` FOREIGN KEY (`db_type_id`) REFERENCES `db_type` (`id`)
-) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8;
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `portal` (
`name` varchar(255) NOT NULL,
@@ -73,10 +73,10 @@ CREATE TABLE `design` ( KEY `FKabb8e74230glxpaiai4aqsr34` (`topic_name_id`),
CONSTRAINT `FKabb8e74230glxpaiai4aqsr34` FOREIGN KEY (`topic_name_id`) REFERENCES `topic_name` (`id`),
CONSTRAINT `FKo43yi6aputq6kwqqu8eqbspm5` FOREIGN KEY (`design_type_id`) REFERENCES `design_type` (`id`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `kafka` (
- `id` varchar(255) NOT NULL,
+ `id` int(11) NOT NULL AUTO_INCREMENT,
`broker_list` varchar(255) NOT NULL,
`consumer_count` int(11) DEFAULT 3,
`enabled` bit(1) NOT NULL,
@@ -130,7 +130,7 @@ CREATE TABLE `map_db_topic` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `map_kafka_topic` (
- `kafka_id` varchar(255) NOT NULL,
+ `kafka_id` int(11) NOT NULL,
`topic_id` int(11) NOT NULL,
PRIMARY KEY (`topic_id`,`kafka_id`),
KEY `FKtdrme4h7rxfh04u2i2wqu23g5` (`kafka_id`),
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db_data.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db_data.sql index 0605e0e9..770c68bf 100644 --- a/components/datalake-handler/feeder/src/assembly/scripts/init_db_data.sql +++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db_data.sql @@ -13,7 +13,7 @@ INSERT INTO datalake.kafka( ,timeout_sec
,zk
) VALUES (
- 'KAFKA_1'
+ 1
,'main Kafka cluster' -- name - IN varchar(255)
,3 -- consumer_count - IN int(11)
,1 -- enabled - IN bit(1)
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java index 901adf29..8d1bf316 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java @@ -87,7 +87,7 @@ public class KafkaController { @PutMapping("/{id}") @ResponseBody @ApiOperation(value="Update a kafka.") - public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable String id, HttpServletResponse response) throws IOException { + public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable int id, HttpServletResponse response) throws IOException { if (result.hasErrors()) { sendError(response, 400, "Error parsing KafkaConfig : "+result.toString()); @@ -116,7 +116,7 @@ public class KafkaController { @DeleteMapping("/{id}") @ResponseBody @ApiOperation(value="delete a kafka.") - public void deleteKafka(@PathVariable("id") String id, HttpServletResponse response) throws IOException{ + public void deleteKafka(@PathVariable("id") int id, HttpServletResponse response) throws IOException{ Kafka oldKafka = kafkaService.getKafkaById(id); if (oldKafka == null) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java index 1162aedd..bb0de4b0 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java @@ -90,7 +90,7 @@ public class TopicController { @GetMapping("/dmaap/{kafkaId}") @ResponseBody @ApiOperation(value = "List all topic names in DMaaP.") - public List<String> listDmaapTopics(@PathVariable("kafkaId") String kafkaId ) { + public List<String> listDmaapTopics(@PathVariable("kafkaId") int kafkaId ) { Kafka kafka = kafkaRepository.findById(kafkaId).get(); DmaapService dmaapService = context.getBean(DmaapService.class, kafka); return dmaapService.getTopics(); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java index 2741c638..7f7b59e8 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java @@ -24,6 +24,8 @@ import java.util.Set; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.FetchType; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.JoinTable; @@ -47,8 +49,9 @@ import org.onap.datalake.feeder.dto.KafkaConfig; @Table(name = "kafka") public class Kafka { @Id - @Column(name="`id`") - private String id; + @GeneratedValue(strategy = GenerationType.IDENTITY) + @Column(name = "`id`") + private int id; @Column(name="`name`", nullable = false) private String name; @@ -117,12 +120,12 @@ public class Kafka { if (this.getClass() != obj.getClass()) return false; - return id.equals(((Kafka) obj).getId()); + return id == ((Kafka) obj).getId(); } @Override public int hashCode() { - return id.hashCode(); + return id; } public KafkaConfig getKafkaConfig() { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index 13e0163e..5d0c7625 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -213,13 +213,13 @@ public class Topic { tConfig.setEnabledSinkdbs(enabledDbList); Set<Kafka> topicKafka = getKafkas(); - List<String> kafkaList = new ArrayList<>(); + List<Integer> kafkaList = new ArrayList<>(); if (topicKafka != null) { for (Kafka kafka : topicKafka) { kafkaList.add(kafka.getId()); } } - tConfig.setSinkKafkas(kafkaList); + tConfig.setKafkas(kafkaList); return tConfig; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java index b158d167..f5e9539c 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java @@ -23,8 +23,6 @@ package org.onap.datalake.feeder.dto; import lombok.Getter; import lombok.Setter; -import java.util.List; - /** * JSON request body for Kafka Config. * @@ -35,7 +33,7 @@ import java.util.List; @Setter public class KafkaConfig { - private String id; + private int id; private String name; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java index a51103b7..6a262ca8 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java @@ -25,10 +25,6 @@ import lombok.Setter; import java.util.List; -import org.apache.commons.lang3.StringUtils; -import org.json.JSONObject; -import org.onap.datalake.feeder.enumeration.DataFormat; - /** * JSON request body for Topic manipulation. * @@ -55,7 +51,7 @@ public class TopicConfig { private String messageIdPath; private String aggregateArrayPath; private String flattenArrayPath; - private List<String> sinkKafkas; + private List<Integer> kafkas; @Override public String toString() { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java index 05d76d55..39d02d36 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java @@ -19,6 +19,12 @@ */ package org.onap.datalake.feeder.enumeration; +import org.onap.datalake.feeder.service.db.CouchbaseService; +import org.onap.datalake.feeder.service.db.DbStoreService; +import org.onap.datalake.feeder.service.db.ElasticsearchService; +import org.onap.datalake.feeder.service.db.HdfsService; +import org.onap.datalake.feeder.service.db.MongodbService; + /** * Database type * @@ -26,12 +32,23 @@ package org.onap.datalake.feeder.enumeration; * */ public enum DbTypeEnum { - CB("Couchbase"), DRUID("Druid"), ES("Elasticsearch"), HDFS("HDFS"), MONGO("MongoDB"), KIBANA("Kibana"), SUPERSET("Superset"); + CB("Couchbase", CouchbaseService.class) + , DRUID("Druid", null) + , ES("Elasticsearch", ElasticsearchService.class) + , HDFS("HDFS", HdfsService.class) + , MONGO("MongoDB", MongodbService.class) + , KIBANA("Kibana", null) + , SUPERSET("Superset", null); private final String name; + private final Class<? extends DbStoreService> serviceClass; - DbTypeEnum(String name) { + DbTypeEnum(String name, Class<? extends DbStoreService> serviceClass) { this.name = name; + this.serviceClass = serviceClass; } + public Class<? extends DbStoreService> getServiceClass(){ + return serviceClass; + } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java index 8e78e5c2..6ce23ba7 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java @@ -30,6 +30,6 @@ import org.springframework.data.repository.CrudRepository; *
*/
-public interface KafkaRepository extends CrudRepository<Kafka, String> {
+public interface KafkaRepository extends CrudRepository<Kafka, Integer> {
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java index addd0606..d54bf3f4 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java @@ -26,11 +26,7 @@ import java.util.Map; import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.domain.DbType; import org.onap.datalake.feeder.enumeration.DbTypeEnum; -import org.onap.datalake.feeder.service.db.CouchbaseService; import org.onap.datalake.feeder.service.db.DbStoreService; -import org.onap.datalake.feeder.service.db.ElasticsearchService; -import org.onap.datalake.feeder.service.db.HdfsService; -import org.onap.datalake.feeder.service.db.MongodbService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -53,34 +49,24 @@ public class DbService { private Map<Integer, DbStoreService> dbStoreServiceMap = new HashMap<>(); public DbStoreService findDbStoreService(Db db) { - DbStoreService ret = dbStoreServiceMap.get(db.getId()); - if (ret != null) { - return ret; + int dbId = db.getId(); + if (dbStoreServiceMap.containsKey(dbId)) { + return dbStoreServiceMap.get(dbId); } DbType dbType = db.getDbType(); DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId()); - switch (dbTypeEnum) { - case CB: - ret = context.getBean(CouchbaseService.class, db); - break; - case ES: - ret = context.getBean(ElasticsearchService.class, db); - break; - case HDFS: - ret = context.getBean(HdfsService.class, db); - break; - case MONGO: - ret = context.getBean(MongodbService.class, db); - break; - default: + Class<? extends DbStoreService> serviceClass = dbTypeEnum.getServiceClass(); + + if (serviceClass == null) { log.error("Should not have come here {}", db); - ret = null; + dbStoreServiceMap.put(dbId, null); + return null; } - - dbStoreServiceMap.put(db.getId(), ret); + + DbStoreService ret = context.getBean(serviceClass, db); + dbStoreServiceMap.put(dbId, ret); return ret; } - } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java index 58ee9087..2e959fa2 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java @@ -39,7 +39,7 @@ public class KafkaService { @Autowired private KafkaRepository kafkaRepository; - public Kafka getKafkaById(String id) { + public Kafka getKafkaById(int id) { Optional<Kafka> ret = kafkaRepository.findById(id); return ret.isPresent() ? ret.get() : null; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java index 151ea3d6..ab99ad09 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java @@ -68,6 +68,7 @@ public class Puller implements Runnable { private final Logger log = LoggerFactory.getLogger(this.getClass()); + //KafkaConsumer is not thread-safe. private ThreadLocal<KafkaConsumer<String, String>> consumerLocal = new ThreadLocal<>(); //<String, String> is key-value type, in our case key is empty, value is JSON text private boolean active = false; @@ -156,7 +157,7 @@ public class Puller implements Runnable { storeService.saveMessages(kafka, partition.topic(), messages); log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB - if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit + if (!async) {//for reliability, sync commit offset to Kafka right after saving the data to data store, this slows down a bit long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java index 6ca8c139..a02cd6a2 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java @@ -61,15 +61,15 @@ public class TopicConfigPollingService implements Runnable { private KafkaRepository kafkaRepository; //effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic. - private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>(); + private Map<Integer, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>(); //private Map<String, TopicConfig> effectiveTopicConfigMap; - //monitor Kafka topic list changes - private Map<String, Set<String>> activeTopicMap; + //monitor Kafka topic list changes, key is kafka id, value is active Topics + private Map<Integer, Set<String>> activeTopicMap; - private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal = ThreadLocal.withInitial(HashMap::new);//topic name:version - private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();//topic name:version - private Map<String, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService + private ThreadLocal<Map<Integer, Integer>> activeTopicsVersionLocal = ThreadLocal.withInitial(HashMap::new);//kafkaId:version - local 'old' version + private Map<Integer, Integer> currentActiveTopicsVersionMap = new HashMap<>();//kafkaId:version - current/latest version + private Map<Integer, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService private boolean active = false; @@ -84,7 +84,7 @@ public class TopicConfigPollingService implements Runnable { } public boolean isActiveTopicsChanged(Kafka kafka) {//update=true means sync local version - String kafkaId = kafka.getId(); + int kafkaId = kafka.getId(); int currentActiveTopicsVersion = currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1);//init did one version int localActiveTopicsVersion = activeTopicsVersionLocal.get().getOrDefault(kafkaId, 0); @@ -125,10 +125,10 @@ public class TopicConfigPollingService implements Runnable { } try { - Map<String, Set<String>> newTopicsMap = poll(); + Map<Integer, Set<String>> newTopicsMap = poll(); - for(Map.Entry<String, Set<String>> entry:newTopicsMap.entrySet()) { - String kafkaId = entry.getKey(); + for(Map.Entry<Integer, Set<String>> entry:newTopicsMap.entrySet()) { + Integer kafkaId = entry.getKey(); Set<String> newTopics = entry.getValue(); Set<String> activeTopics = activeTopicMap.get(kafkaId); @@ -155,8 +155,8 @@ public class TopicConfigPollingService implements Runnable { active = false; } - private Map<String, Set<String>> poll() throws IOException { - Map<String, Set<String>> ret = new HashMap<>(); + private Map<Integer, Set<String>> poll() throws IOException { + Map<Integer, Set<String>> ret = new HashMap<>(); Iterable<Kafka> kafkas = kafkaRepository.findAll(); for (Kafka kafka : kafkas) { if (kafka.isEnabled()) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index 73f6293f..ed9b5c20 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -172,8 +172,8 @@ public class TopicService { } Set<Kafka> relateKafka = new HashSet<>(); - if (tConfig.getSinkKafkas() != null) { - for (String item : tConfig.getSinkKafkas()) { + if (tConfig.getKafkas() != null) { + for (int item : tConfig.getKafkas()) { Optional<Kafka> sinkKafka = kafkaRepository.findById(item); if (sinkKafka.isPresent()) { relateKafka.add(sinkKafka.get()); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java index f2ac5e94..44b940a2 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java @@ -66,6 +66,7 @@ public class CouchbaseService implements DbStoreService { ApplicationConfiguration config; private Db couchbase; + //Bucket is thread-safe. https://docs.couchbase.com/java-sdk/current/managing-connections.html Bucket bucket; public CouchbaseService(Db db) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java index 18b7e2fb..e303fa9b 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java @@ -73,7 +73,7 @@ public class ElasticsearchService implements DbStoreService { @Autowired private ApplicationConfiguration config; - private RestHighLevelClient client; + private RestHighLevelClient client;//thread safe ActionListener<BulkResponse> listener; public ElasticsearchService(Db db) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java index a044790e..8677c6f6 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java @@ -72,11 +72,9 @@ public class MongodbService implements DbStoreService { private ApplicationConfiguration config; private boolean dbReady = false; - //@Autowired -// private DbService dbService; - private MongoDatabase database; private MongoClient mongoClient; + //MongoCollection is ThreadSafe private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>(); private InsertManyOptions insertManyOptions; diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicConfigPollingServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicConfigPollingServiceTest.java index 04545a9a..bd26519b 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicConfigPollingServiceTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicConfigPollingServiceTest.java @@ -69,8 +69,8 @@ public class TopicConfigPollingServiceTest { init.invoke(topicConfigPollingService); Set<String> activeTopics = new HashSet<>(Arrays.asList("test")); - Map<String, Set<String>> activeTopicMap = new HashMap<>(); - activeTopicMap.put(KAFKA_NAME, activeTopics); + Map<Integer, Set<String>> activeTopicMap = new HashMap<>(); + activeTopicMap.put(1, activeTopics); Field activeTopicsField = TopicConfigPollingService.class.getDeclaredField("activeTopicMap"); activeTopicsField.setAccessible(true); @@ -114,6 +114,7 @@ public class TopicConfigPollingServiceTest { @Test public void testGet() { Kafka kafka = TestUtil.newKafka(KAFKA_NAME); + kafka.setId(1); //assertNull(topicConfigPollingService.getEffectiveTopic (kafka, "test")); assertNotNull(topicConfigPollingService.getActiveTopics(kafka)); diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java index 2ea2e835..4eebcb47 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java @@ -99,7 +99,7 @@ public class TopicServiceTest { db.setDbType(dbType); Kafka kafka = new Kafka(); - kafka.setId("1234"); + kafka.setName("1234"); kafkas.add(kafka); TopicName topicName = new TopicName(); diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/TestUtil.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/TestUtil.java index a54cfd37..770cf31b 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/TestUtil.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/TestUtil.java @@ -45,7 +45,8 @@ public class TestUtil { public static Kafka newKafka(String name) { Kafka kafka = new Kafka(); - kafka.setId(name); + kafka.setId(i++); + kafka.setName(name); return kafka ; } |