summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-06-28 14:51:10 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-06-28 14:51:10 -0700
commitaaab75304e99b08b3771fabfac08d61d27721ea5 (patch)
treede6e3b7b3ac2f595fa49171f232c15e05b627b8f /components/datalake-handler/feeder/src/main/java
parentb14c5766902d486a94a8db96d2a31ff0e9e8255e (diff)
supports multiple Kafka clusters and DBs
Read data from Kafka and store into DBs Issue-ID: DCAEGEN2-1631 Change-Id: Ib8fccfd84cfdcd2e284ba4f2503b0fbfe41eb5ae Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java17
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java6
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java52
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java11
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java11
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java7
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java50
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java13
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java9
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java13
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java11
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java5
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java4
14 files changed, 101 insertions, 110 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 806dc72e..05d6e887 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -54,23 +54,8 @@ public class ApplicationConfiguration {
private String defaultTopicName;
- private int checkTopicInterval; //in millisecond
-/*
- //DMaaP
- private String dmaapZookeeperHostPort;
- private String dmaapKafkaHostPort;
- private String dmaapKafkaGroup;
- private String dmaapKafkaLogin;
- private String dmaapKafkaPass;
- private String dmaapKafkaSecurityProtocol;
-
- private long dmaapKafkaTimeout;
- private String[] dmaapKafkaExclude;
+ private long checkTopicInterval; //in millisecond
- private int dmaapCheckNewTopicInterval; //in millisecond
-
- private int kafkaConsumerCount;
-*/
private String elasticsearchType;
//HDFS
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
index 7059cd09..2a653d89 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
@@ -101,6 +101,10 @@ public class Db {
)
private Set<Topic> topics;
+ public boolean isTool() {
+ return dbType.isTool();
+ }
+
public boolean isHdfs() {
return isDb(DbTypeEnum.HDFS);
}
@@ -127,7 +131,7 @@ public class Db {
@Override
public String toString() {
- return String.format("Db %s (name=%, enabled=%s)", id, name, enabled);
+ return String.format("Db %s (name=%s, enabled=%s)", id, name, enabled);
}
@Override
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
index d2189cbc..26be942a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
@@ -105,7 +105,7 @@ public class Kafka {
@Override
public String toString() {
- return String.format("Kafka %s (name=%, enabled=%s)", id, name, enabled);
+ return String.format("Kafka %s (name=%s, enabled=%s)", id, name, enabled);
}
@Override
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
index 2e934e2e..addd0606 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
@@ -20,8 +20,21 @@
package org.onap.datalake.feeder.service;
-import org.onap.datalake.feeder.repository.DbRepository;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.DbType;
+import org.onap.datalake.feeder.enumeration.DbTypeEnum;
+import org.onap.datalake.feeder.service.db.CouchbaseService;
+import org.onap.datalake.feeder.service.db.DbStoreService;
+import org.onap.datalake.feeder.service.db.ElasticsearchService;
+import org.onap.datalake.feeder.service.db.HdfsService;
+import org.onap.datalake.feeder.service.db.MongodbService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
@@ -32,7 +45,42 @@ import org.springframework.stereotype.Service;
*/
@Service
public class DbService {
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
- private DbRepository dbRepository;
+ private ApplicationContext context;
+
+ private Map<Integer, DbStoreService> dbStoreServiceMap = new HashMap<>();
+
+ public DbStoreService findDbStoreService(Db db) {
+ DbStoreService ret = dbStoreServiceMap.get(db.getId());
+ if (ret != null) {
+ return ret;
+ }
+
+ DbType dbType = db.getDbType();
+ DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
+ switch (dbTypeEnum) {
+ case CB:
+ ret = context.getBean(CouchbaseService.class, db);
+ break;
+ case ES:
+ ret = context.getBean(ElasticsearchService.class, db);
+ break;
+ case HDFS:
+ ret = context.getBean(HdfsService.class, db);
+ break;
+ case MONGO:
+ ret = context.getBean(MongodbService.class, db);
+ break;
+ default:
+ log.error("Should not have come here {}", db);
+ ret = null;
+ }
+
+ dbStoreServiceMap.put(db.getId(), ret);
+
+ return ret;
+ }
+
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
index 1bfd437f..671234ba 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
@@ -21,7 +21,6 @@
package org.onap.datalake.feeder.service;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -32,6 +31,7 @@ import java.util.concurrent.CountDownLatch;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -39,10 +39,10 @@ import org.apache.zookeeper.ZooKeeper;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.EffectiveTopic;
import org.onap.datalake.feeder.domain.Kafka;
-import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
/**
@@ -52,6 +52,7 @@ import org.springframework.stereotype.Service;
*
*/
@Service
+@Scope("prototype")
public class DmaapService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -145,8 +146,10 @@ public class DmaapService {
log.debug("get topic setting from DB: {}.", topicStr);
List<EffectiveTopic> effectiveTopics= topicService.getEnabledEffectiveTopic(kafka, topicStr, true);
-
- ret.put(topicStr , effectiveTopics);
+ if(CollectionUtils.isNotEmpty(effectiveTopics )) {
+ log.debug("add effectiveTopics {}:{}.", topicStr, effectiveTopics);
+ ret.put(topicStr , effectiveTopics);
+ }
}
return ret;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
index 65de0bdc..09a59ee3 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
@@ -50,7 +50,6 @@ public class PullService {
private boolean isRunning = false;
private ExecutorService executorService;
-// private Thread topicConfigPollingThread;
private Set<Puller> pullers;
@Autowired
@@ -95,10 +94,7 @@ public class PullService {
}
executorService.submit(topicConfigPollingService);
- /*topicConfigPollingThread = new Thread(topicConfigPollingService);
- topicConfigPollingThread.setName("TopicConfigPolling");
- topicConfigPollingThread.start();
-*/
+
isRunning = true;
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
@@ -127,11 +123,6 @@ public class PullService {
puller.shutdown();
}
-// logger.info("stop TopicConfigPollingService ...");
-// topicConfigPollingService.shutdown();
-
- // topicConfigPollingThread.join();
-
logger.info("stop executorService ...");
executorService.shutdown();
executorService.awaitTermination(120L, TimeUnit.SECONDS);
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
index 1550e531..151ea3d6 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
@@ -43,6 +43,7 @@ import org.onap.datalake.feeder.domain.Kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
/**
@@ -53,6 +54,7 @@ import org.springframework.stereotype.Service;
*/
@Service
+@Scope("prototype")
public class Puller implements Runnable {
@Autowired
@@ -72,10 +74,7 @@ public class Puller implements Runnable {
private boolean async;
private Kafka kafka;
-
- public Puller( ) {
-
- }
+
public Puller(Kafka kafka) {
this.kafka = kafka;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index f5a7698d..0e54b9b5 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -35,22 +35,14 @@ import org.json.JSONObject;
import org.json.XML;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.domain.DbType;
import org.onap.datalake.feeder.domain.EffectiveTopic;
import org.onap.datalake.feeder.domain.Kafka;
-import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
-import org.onap.datalake.feeder.enumeration.DbTypeEnum;
-import org.onap.datalake.feeder.service.db.CouchbaseService;
import org.onap.datalake.feeder.service.db.DbStoreService;
-import org.onap.datalake.feeder.service.db.ElasticsearchService;
-import org.onap.datalake.feeder.service.db.HdfsService;
-import org.onap.datalake.feeder.service.db.MongodbService;
import org.onap.datalake.feeder.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -73,7 +65,7 @@ public class StoreService {
private ApplicationConfiguration config;
@Autowired
- private ApplicationContext context;
+ private DbService dbService;
@Autowired
private TopicConfigPollingService configPollingService;
@@ -91,11 +83,11 @@ public class StoreService {
}
Collection<EffectiveTopic> effectiveTopics = configPollingService.getEffectiveTopic(kafka, topicStr);
- for(EffectiveTopic effectiveTopic:effectiveTopics) {
+ for (EffectiveTopic effectiveTopic : effectiveTopics) {
saveMessagesForTopic(effectiveTopic, messages);
}
}
-
+
private void saveMessagesForTopic(EffectiveTopic effectiveTopic, List<Pair<Long, String>> messages) {
if (!effectiveTopic.getTopic().isEnabled()) {
log.error("we should not come here {}", effectiveTopic);
@@ -116,11 +108,13 @@ public class StoreService {
Set<Db> dbs = effectiveTopic.getTopic().getDbs();
for (Db db : dbs) {
- if (db.getDbType().isTool() || !db.isEnabled()) {
+ if (db.isTool() || db.isDruid() || !db.isEnabled()) {
continue;
}
- DbStoreService dbStoreService = findDbStoreService(db);
- dbStoreService.saveJsons(effectiveTopic, docs);
+ DbStoreService dbStoreService = dbService.findDbStoreService(db);
+ if (dbStoreService != null) {
+ dbStoreService.saveJsons(effectiveTopic, docs);
+ }
}
}
@@ -129,12 +123,6 @@ public class StoreService {
long timestamp = pair.getLeft();
String text = pair.getRight();
- //for debug, to be remove
- // String topicStr = topic.getId();
- // if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
- // log.debug("{} ={}", topicStr, text);
- //}
-
boolean storeRaw = effectiveTopic.getTopic().isSaveRaw();
JSONObject json = null;
@@ -187,29 +175,11 @@ public class StoreService {
return json;
}
- private DbStoreService findDbStoreService(Db db) {
- DbType dbType = db.getDbType();
- DbTypeEnum dbTypeEnum = DbTypeEnum.valueOf(dbType.getId());
- switch (dbTypeEnum) {
- case CB:
- return context.getBean(CouchbaseService.class, db);
- case ES:
- return context.getBean(ElasticsearchService.class, db);
- case HDFS:
- return context.getBean(HdfsService.class, db);
- case MONGO:
- return context.getBean(MongodbService.class, db);
- default:
- log.error("we should not come here {}", dbTypeEnum);
- return null;
- }
- }
-
public void flush() { //force flush all buffer
-// hdfsService.flush();
+ // hdfsService.flush();
}
public void flushStall() { //flush stall buffer
- // hdfsService.flushStall();
+ // hdfsService.flushStall();
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
index 8f703b1d..6ca8c139 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
@@ -61,14 +61,15 @@ public class TopicConfigPollingService implements Runnable {
private KafkaRepository kafkaRepository;
//effectiveTopic Map, 1st key is kafkaId, 2nd is topic name, the value is a list of EffectiveTopic.
- private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();;
+ private Map<String, Map<String, List<EffectiveTopic>>> effectiveTopicMap = new HashMap<>();
//private Map<String, TopicConfig> effectiveTopicConfigMap;
//monitor Kafka topic list changes
private Map<String, Set<String>> activeTopicMap;
- private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal = new ThreadLocal<>();
- private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();
+ private ThreadLocal<Map<String, Integer>> activeTopicsVersionLocal = ThreadLocal.withInitial(HashMap::new);//topic name:version
+ private Map<String, Integer> currentActiveTopicsVersionMap = new HashMap<>();//topic name:version
+ private Map<String, DmaapService> dmaapServiceMap = new HashMap<>();//kafka id:DmaapService
private boolean active = false;
@@ -169,7 +170,11 @@ public class TopicConfigPollingService implements Runnable {
private Set<String> poll(Kafka kafka) throws IOException {
log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
- DmaapService dmaapService = context.getBean(DmaapService.class, kafka);
+ DmaapService dmaapService = dmaapServiceMap.get(kafka.getId());
+ if(dmaapService==null) {
+ dmaapService = context.getBean(DmaapService.class, kafka);
+ dmaapServiceMap.put(kafka.getId(), dmaapService);
+ }
Map<String, List<EffectiveTopic>> activeEffectiveTopics = dmaapService.getActiveEffectiveTopic();
effectiveTopicMap.put(kafka.getId(), activeEffectiveTopics);
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index 86b27a9a..645160e2 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -41,7 +41,6 @@ import org.onap.datalake.feeder.service.db.ElasticsearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
@@ -59,9 +58,6 @@ public class TopicService {
private ApplicationConfiguration config;
@Autowired
- private ApplicationContext context;
-
- @Autowired
private TopicNameRepository topicNameRepository;
@Autowired
@@ -70,6 +66,9 @@ public class TopicService {
@Autowired
private DbRepository dbRepository;
+ @Autowired
+ private DbService dbService;
+
public List<EffectiveTopic> getEnabledEffectiveTopic(Kafka kafka, String topicStr, boolean ensureTableExist) throws IOException {
List<Topic> topics = findTopics(kafka, topicStr);
@@ -88,7 +87,7 @@ public class TopicService {
if (ensureTableExist) {
for (Db db : topic.getDbs()) {
if (db.isElasticsearch()) {
- ElasticsearchService elasticsearchService = context.getBean(ElasticsearchService.class, db);
+ ElasticsearchService elasticsearchService = (ElasticsearchService) dbService.findDbStoreService(db);
elasticsearchService.ensureTableExist(topicStr);
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
index 33c8847e..bd2d9715 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/CouchbaseService.java
@@ -35,6 +35,7 @@ import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import com.couchbase.client.java.Bucket;
@@ -56,6 +57,7 @@ import rx.functions.Func1;
*
*/
@Service
+@Scope("prototype")
public class CouchbaseService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -64,17 +66,8 @@ public class CouchbaseService implements DbStoreService {
ApplicationConfiguration config;
private Db couchbase;
-/*
- @Autowired
- private DbService dbService;
-
- private boolean isReady = false;
-*/
Bucket bucket;
-
- public CouchbaseService( ) {
-
- }
+
public CouchbaseService(Db db) {
couchbase = db;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
index aee63ed7..4dfcdd22 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/ElasticsearchService.java
@@ -53,6 +53,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
/**
@@ -62,6 +63,7 @@ import org.springframework.stereotype.Service;
*
*/
@Service
+@Scope("prototype")
public class ElasticsearchService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -71,15 +73,9 @@ public class ElasticsearchService implements DbStoreService {
@Autowired
private ApplicationConfiguration config;
- //@Autowired
-// private DbService dbService;
-
private RestHighLevelClient client;
ActionListener<BulkResponse> listener;
-
- public ElasticsearchService( ) {
-
- }
+
public ElasticsearchService(Db db) {
elasticsearch = db;
}
@@ -88,7 +84,6 @@ public class ElasticsearchService implements DbStoreService {
//Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
@PostConstruct
private void init() {
- //Db elasticsearch = dbService.getElasticsearch();
String elasticsearchHost = elasticsearch.getHost();
// Initialize the Connection
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java
index 0e107fdf..ea0e77aa 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/HdfsService.java
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import lombok.Getter;
@@ -59,6 +60,7 @@ import lombok.Setter;
*
*/
@Service
+@Scope("prototype")
public class HdfsService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -152,9 +154,6 @@ public class HdfsService implements DbStoreService {
}
}
- public HdfsService( ) {
- }
-
public HdfsService(Db db) {
hdfs = db;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
index 0f522f6b..5cc4070d 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/db/MongodbService.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import com.mongodb.bulk.BulkWriteError;
@@ -59,6 +60,7 @@ import com.mongodb.client.model.InsertManyOptions;
*
*/
@Service
+@Scope("prototype")
public class MongodbService implements DbStoreService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -77,8 +79,6 @@ public class MongodbService implements DbStoreService {
private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
private InsertManyOptions insertManyOptions;
- public MongodbService( ) {
- }
public MongodbService(Db db) {
mongodb = db;
}