summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-07-10 01:34:20 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-07-10 01:34:20 -0700
commite5c87e6ddfc5e727b1b4d11e994ad242fd366534 (patch)
tree5802f76f19549f08a7513e7e2673cfdd3177ce81 /components/datalake-handler/feeder/src
parent7a094862a6a679b022bf777d3f15e32a048ba4d8 (diff)
supports multiple Kafka clusters and DBs
Change kafka table id to int Issue-ID: DCAEGEN2-1631 Change-Id: Ib5109b75a387f76709dc38161bf2d7ef084950ef Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src')
-rw-r--r--components/datalake-handler/feeder/src/assembly/scripts/init_db.sql8
-rw-r--r--components/datalake-handler/feeder/src/assembly/scripts/init_db_data.sql2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java11
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java6
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java21
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java36
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java3
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java24
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java1
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java4
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicConfigPollingServiceTest.java5
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java2
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/TestUtil.java3
21 files changed, 76 insertions, 74 deletions
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
index 02f2343c..1fd9aa84 100644
--- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
+++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
@@ -32,7 +32,7 @@ CREATE TABLE `db` (
PRIMARY KEY (`id`),
KEY `FK3njadtw43ieph7ftt4kxdhcko` (`db_type_id`),
CONSTRAINT `FK3njadtw43ieph7ftt4kxdhcko` FOREIGN KEY (`db_type_id`) REFERENCES `db_type` (`id`)
-) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8;
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `portal` (
`name` varchar(255) NOT NULL,
@@ -73,10 +73,10 @@ CREATE TABLE `design` (
KEY `FKabb8e74230glxpaiai4aqsr34` (`topic_name_id`),
CONSTRAINT `FKabb8e74230glxpaiai4aqsr34` FOREIGN KEY (`topic_name_id`) REFERENCES `topic_name` (`id`),
CONSTRAINT `FKo43yi6aputq6kwqqu8eqbspm5` FOREIGN KEY (`design_type_id`) REFERENCES `design_type` (`id`)
-) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `kafka` (
- `id` varchar(255) NOT NULL,
+ `id` int(11) NOT NULL AUTO_INCREMENT,
`broker_list` varchar(255) NOT NULL,
`consumer_count` int(11) DEFAULT 3,
`enabled` bit(1) NOT NULL,
@@ -130,7 +130,7 @@ CREATE TABLE `map_db_topic` (
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `map_kafka_topic` (
- `kafka_id` varchar(255) NOT NULL,
+ `kafka_id` int(11) NOT NULL,
`topic_id` int(11) NOT NULL,
PRIMARY KEY (`topic_id`,`kafka_id`),
KEY `FKtdrme4h7rxfh04u2i2wqu23g5` (`kafka_id`),
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db_data.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db_data.sql
index 0605e0e9..770c68bf 100644
--- a/components/datalake-handler/feeder/src/assembly/scripts/init_db_data.sql
+++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db_data.sql
@@ -13,7 +13,7 @@ INSERT INTO datalake.kafka(
,timeout_sec
,zk
) VALUES (
- 'KAFKA_1'
+ 1
,'main Kafka cluster' -- name - IN varchar(255)
,3 -- consumer_count - IN int(11)
,1 -- enabled - IN bit(1)
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java
index 901adf29..8d1bf316 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java
@@ -87,7 +87,7 @@ public class KafkaController {
@PutMapping("/{id}")
@ResponseBody
@ApiOperation(value="Update a kafka.")
- public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable String id, HttpServletResponse response) throws IOException {
+ public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable int id, HttpServletResponse response) throws IOException {
if (result.hasErrors()) {
sendError(response, 400, "Error parsing KafkaConfig : "+result.toString());
@@ -116,7 +116,7 @@ public class KafkaController {
@DeleteMapping("/{id}")
@ResponseBody
@ApiOperation(value="delete a kafka.")
- public void deleteKafka(@PathVariable("id") String id, HttpServletResponse response) throws IOException{
+ public void deleteKafka(@PathVariable("id") int id, HttpServletResponse response) throws IOException{
Kafka oldKafka = kafkaService.getKafkaById(id);
if (oldKafka == null) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
index 1162aedd..bb0de4b0 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -90,7 +90,7 @@ public class TopicController {
@GetMapping("/dmaap/{kafkaId}")
@ResponseBody
@ApiOperation(value = "List all topic names in DMaaP.")
- public List<String> listDmaapTopics(@PathVariable("kafkaId") String kafkaId ) {
+ public List<String> listDmaapTopics(@PathVariable("kafkaId") int kafkaId ) {
Kafka kafka = kafkaRepository.findById(kafkaId).get();
DmaapService dmaapService = context.getBean(DmaapService.class, kafka);
return dmaapService.getTopics();
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
index 2741c638..7f7b59e8 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
@@ -24,6 +24,8 @@ import java.util.Set;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.JoinTable;
@@ -47,8 +49,9 @@ import org.onap.datalake.feeder.dto.KafkaConfig;
@Table(name = "kafka")
public class Kafka {
@Id
- @Column(name="`id`")
- private String id;
+ @GeneratedValue(strategy = GenerationType.IDENTITY)
+ @Column(name = "`id`")
+ private int id;
@Column(name="`name`", nullable = false)
private String name;
@@ -117,12 +120,12 @@ public class Kafka {
if (this.getClass() != obj.getClass())
return false;
- return id.equals(((Kafka) obj).getId());
+ return id == ((Kafka) obj).getId();
}
@Override
public int hashCode() {
- return id.hashCode();
+ return id;
}
public KafkaConfig getKafkaConfig() {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index 13e0163e..5d0c7625 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -213,13 +213,13 @@ public class Topic {
tConfig.setEnabledSinkdbs(enabledDbList);
Set<Kafka> topicKafka = getKafkas();
- List<String> kafkaList = new ArrayList<>();
+ List<Integer> kafkaList = new ArrayList<>();
if (topicKafka != null) {
for (Kafka kafka : topicKafka) {
kafkaList.add(kafka.getId());
}
}
- tConfig.setSinkKafkas(kafkaList);
+ tConfig.setKafkas(kafkaList);
return tConfig;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java
index b158d167..f5e9539c 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java
@@ -23,8 +23,6 @@ package org.onap.datalake.feeder.dto;
import lombok.Getter;
import lombok.Setter;
-import java.util.List;
-
/**
* JSON request body for Kafka Config.
*
@@ -35,7 +33,7 @@ import java.util.List;
@Setter
public class KafkaConfig {
- private String id;
+ private int id;
private String name;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index a51103b7..6a262ca8 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -25,10 +25,6 @@ import lombok.Setter;
import java.util.List;
-import org.apache.commons.lang3.StringUtils;
-import org.json.JSONObject;
-import org.onap.datalake.feeder.enumeration.DataFormat;
-
/**
* JSON request body for Topic manipulation.
*
@@ -55,7 +51,7 @@ public class TopicConfig {
private String messageIdPath;
private String aggregateArrayPath;
private String flattenArrayPath;
- private List<String> sinkKafkas;
+ private List<Integer> kafkas;
@Override
public String toString() {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java
index 05d76d55..39d02d36 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DbTypeEnum.java
@@ -19,6 +19,12 @@
*/
package org.onap.datalake.feeder.enumeration;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
+
/**
* Database type
*
@@ -26,12 +32,23 @@ package org.onap.datalake.feeder.enumeration;
*
*/
public enum DbTypeEnum {
- CB("Couchbase"), DRUID("Druid"), ES("Elasticsearch"), HDFS("HDFS"), MONGO("MongoDB"), KIBANA("Kibana"), SUPERSET("Superset");
+ CB("Couchbase", CouchbaseService.class)
+ , DRUID("Druid", null)
+ , ES("Elasticsearch", ElasticsearchService.class)
+ , HDFS("HDFS", HdfsService.class)
+ , MONGO("MongoDB", MongodbService.class)
+ , KIBANA("Kibana", null)
+ , SUPERSET("Superset", null);
private final String name;
+ private final Class<? extends DbStoreService> serviceClass;
- DbTypeEnum(String name) {
+ DbTypeEnum(String name, Class<? extends DbStoreService> serviceClass) {
this.name = name;
+ this.serviceClass = serviceClass;
}
+ public Class<? extends DbStoreService> getServiceClass(){
+ return serviceClass;
+ }
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java
index 8e78e5c2..6ce23ba7 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/KafkaRepository.java
@@ -30,6 +30,6 @@ import org.springframework.data.repository.CrudRepository;
*
*/
-public interface KafkaRepository extends CrudRepository<Kafka, String> {
+public interface KafkaRepository extends CrudRepository<Kafka, Integer> {
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
index addd0606..d54bf3f4 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
@@ -26,11 +26,7 @@ import java.util.Map;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.DbType;
import org.onap.datalake.feeder.enumeration.DbTypeEnum;
-import org.onap.datalake.feeder.service.db.CouchbaseService;
import org.onap.datalake.feeder.service.db.DbStoreService;
-import org.onap.datalake.feeder.service.db.ElasticsearchService;
-import org.onap.datalake.feeder.service.db.HdfsService;
-import org.onap.datalake.feeder.service.db.MongodbService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -53,34 +49,24 @@ public class DbService {
private Map<Integer, DbStoreService> dbStoreServiceMap = new HashMap<>();
public DbStoreService findDbStoreService(Db db) {
- DbStoreService ret = dbStoreServiceMap.get(db.getId());
- if (ret != null) {
- return ret;
+ int dbId = db.getId();
+ if (dbStoreServiceMap.containsKey(dbId)) {
+ return dbStoreServiceMap.get(dbId);
}
DbType dbType = db.getDbType();
DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
- switch (dbTypeEnum) {
- case CB:
- ret = context.getBean(CouchbaseService.class, db);
- break;
- case ES:
- ret = context.getBean(ElasticsearchService.class, db);
- break;
- case HDFS:
- ret = context.getBean(HdfsService.class, db);
- break;
- case MONGO:
- ret = context.getBean(MongodbService.class, db);
- break;
- default:
+ Class<? extends DbStoreService> serviceClass = dbTypeEnum.getServiceClass();
+
+ if (serviceClass == null) {
log.error("Should not have come here {}", db);
- ret = null;
+ dbStoreServiceMap.put(dbId, null);
+ return null;
}
-
- dbStoreServiceMap.put(db.getId(), ret);
+
+ DbStoreService ret = context.getBean(serviceClass, db);
+ dbStoreServiceMap.put(dbId, ret);
return ret;
}
-
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java
index 58ee9087..2e959fa2 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java
@@ -39,7 +39,7 @@ public class KafkaService {
@Autowired
private KafkaRepository kafkaRepository;
- public Kafka getKafkaById(String id) {
+ public Kafka getKafkaById(int id) {
Optional<Kafka> ret = kafkaRepository.findById(id);
return ret.isPresent() ? ret.get() : null;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
index 151ea3d6..ab99ad09 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
@@ -68,6 +68,7 @@ public class Puller implements Runnable {
private final Logger log = LoggerFactory.getLogger(this.getClass());
+ //KafkaConsumer is not thread-safe.
private ThreadLocal<KafkaConsumer<String, String>> consumerLocal = new ThreadLocal<>(); //<String, String> is key-value type, in our case key is empty, value is JSON text
private boolean active = false;
@@ -156,7 +157,7 @@ public class Puller implements Runnable {
storeService.saveMessages(kafka, partition.topic(), messages);
log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
- if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+ if (!async) {//for reliability, sync commit offset to Kafka right after saving the data to data store, this slows down a bit
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
index 6ca8c139..a02cd6a2 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
@@ -61,15 +61,15 @@ public class TopicConfigPollingService implements Runnable {
private KafkaRepository kafkaRepository;
//effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic.
- private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();
+ private Map<Integer, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();
//private Map<String, TopicConfig> effectiveTopicConfigMap;
- //monitor Kafka topic list changes
- private Map<String, Set<String>> activeTopicMap;
+ //monitor Kafka topic list changes, key is kafka id, value is active Topics
+ private Map<Integer, Set<String>> activeTopicMap;
- private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal = ThreadLocal.withInitial(HashMap::new);//topic name:version
- private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();//topic name:version
- private Map<String, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService
+ private ThreadLocal<Map<Integer, Integer>> activeTopicsVersionLocal = ThreadLocal.withInitial(HashMap::new);//kafkaId:version - local 'old' version
+ private Map<Integer, Integer> currentActiveTopicsVersionMap = new HashMap<>();//kafkaId:version - current/latest version
+ private Map<Integer, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService
private boolean active = false;
@@ -84,7 +84,7 @@ public class TopicConfigPollingService implements Runnable {
}
public boolean isActiveTopicsChanged(Kafka kafka) {//update=true means sync local version
- String kafkaId = kafka.getId();
+ int kafkaId = kafka.getId();
int currentActiveTopicsVersion = currentActiveTopicsVersionMap.getOrDefault(kafkaId, 1);//init did one version
int localActiveTopicsVersion = activeTopicsVersionLocal.get().getOrDefault(kafkaId, 0);
@@ -125,10 +125,10 @@ public class TopicConfigPollingService implements Runnable {
}
try {
- Map<String, Set<String>> newTopicsMap = poll();
+ Map<Integer, Set<String>> newTopicsMap = poll();
- for(Map.Entry<String, Set<String>> entry:newTopicsMap.entrySet()) {
- String kafkaId = entry.getKey();
+ for(Map.Entry<Integer, Set<String>> entry:newTopicsMap.entrySet()) {
+ Integer kafkaId = entry.getKey();
Set<String> newTopics = entry.getValue();
Set<String> activeTopics = activeTopicMap.get(kafkaId);
@@ -155,8 +155,8 @@ public class TopicConfigPollingService implements Runnable {
active = false;
}
- private Map<String, Set<String>> poll() throws IOException {
- Map<String, Set<String>> ret = new HashMap<>();
+ private Map<Integer, Set<String>> poll() throws IOException {
+ Map<Integer, Set<String>> ret = new HashMap<>();
Iterable<Kafka> kafkas = kafkaRepository.findAll();
for (Kafka kafka : kafkas) {
if (kafka.isEnabled()) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index 73f6293f..ed9b5c20 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -172,8 +172,8 @@ public class TopicService {
}
Set<Kafka> relateKafka = new HashSet<>();
- if (tConfig.getSinkKafkas() != null) {
- for (String item : tConfig.getSinkKafkas()) {
+ if (tConfig.getKafkas() != null) {
+ for (int item : tConfig.getKafkas()) {
Optional<Kafka> sinkKafka = kafkaRepository.findById(item);
if (sinkKafka.isPresent()) {
relateKafka.add(sinkKafka.get());
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
index f2ac5e94..44b940a2 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
@@ -66,6 +66,7 @@ public class CouchbaseService implements DbStoreService {
ApplicationConfiguration config;
private Db couchbase;
+ //Bucket is thread-safe. https://docs.couchbase.com/java-sdk/current/managing-connections.html
Bucket bucket;
public CouchbaseService(Db db) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
index 18b7e2fb..e303fa9b 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
@@ -73,7 +73,7 @@ public class ElasticsearchService implements DbStoreService {
@Autowired
private ApplicationConfiguration config;
- private RestHighLevelClient client;
+ private RestHighLevelClient client;//thread safe
ActionListener<BulkResponse> listener;
public ElasticsearchService(Db db) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
index a044790e..8677c6f6 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
@@ -72,11 +72,9 @@ public class MongodbService implements DbStoreService {
private ApplicationConfiguration config;
private boolean dbReady = false;
- //@Autowired
-// private DbService dbService;
-
private MongoDatabase database;
private MongoClient mongoClient;
+ //MongoCollection is ThreadSafe
private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
private InsertManyOptions insertManyOptions;
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicConfigPollingServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicConfigPollingServiceTest.java
index 04545a9a..bd26519b 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicConfigPollingServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicConfigPollingServiceTest.java
@@ -69,8 +69,8 @@ public class TopicConfigPollingServiceTest {
init.invoke(topicConfigPollingService);
Set<String> activeTopics = new HashSet<>(Arrays.asList("test"));
- Map<String, Set<String>> activeTopicMap = new HashMap<>();
- activeTopicMap.put(KAFKA_NAME, activeTopics);
+ Map<Integer, Set<String>> activeTopicMap = new HashMap<>();
+ activeTopicMap.put(1, activeTopics);
Field activeTopicsField = TopicConfigPollingService.class.getDeclaredField("activeTopicMap");
activeTopicsField.setAccessible(true);
@@ -114,6 +114,7 @@ public class TopicConfigPollingServiceTest {
@Test
public void testGet() {
Kafka kafka = TestUtil.newKafka(KAFKA_NAME);
+ kafka.setId(1);
//assertNull(topicConfigPollingService.getEffectiveTopic (kafka, "test"));
assertNotNull(topicConfigPollingService.getActiveTopics(kafka));
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
index 2ea2e835..4eebcb47 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
@@ -99,7 +99,7 @@ public class TopicServiceTest {
db.setDbType(dbType);
Kafka kafka = new Kafka();
- kafka.setId("1234");
+ kafka.setName("1234");
kafkas.add(kafka);
TopicName topicName = new TopicName();
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/TestUtil.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/TestUtil.java
index a54cfd37..770cf31b 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/TestUtil.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/TestUtil.java
@@ -45,7 +45,8 @@ public class TestUtil {
public static Kafka newKafka(String name) {
Kafka kafka = new Kafka();
- kafka.setId(name);
+ kafka.setId(i++);
+ kafka.setName(name);
return kafka ;
}