summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java39
1 files changed, 31 insertions, 8 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
index 84d5f337..dc04cf60 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
@@ -21,14 +21,19 @@
package org.onap.datalake.feeder.service;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Kafka;
+import org.onap.datalake.feeder.repository.KafkaRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
@@ -46,9 +51,10 @@ public class PullService {
private boolean isRunning = false;
private ExecutorService executorService;
private Thread topicConfigPollingThread;
+ private Set<Puller> pullers;
@Autowired
- private Puller puller;
+ private KafkaRepository kafkaRepository;
@Autowired
private TopicConfigPollingService topicConfigPollingService;
@@ -56,6 +62,9 @@ public class PullService {
@Autowired
private ApplicationConfiguration config;
+ @Autowired
+ private ApplicationContext context;
+
/**
* @return the isRunning
*/
@@ -73,12 +82,16 @@ public class PullService {
return;
}
- logger.info("start pulling ...");
- int numConsumers = config.getKafkaConsumerCount();
- executorService = Executors.newFixedThreadPool(numConsumers);
+ logger.info("PullService starting ...");
- for (int i = 0; i < numConsumers; i++) {
- executorService.submit(puller);
+ pullers = new HashSet<>();
+ executorService = Executors.newCachedThreadPool();
+
+ Iterable<Kafka> kafkas = kafkaRepository.findAll();
+ for (Kafka kafka : kafkas) {
+ if (kafka.isEnabled()) {
+ doKafka(kafka);
+ }
}
topicConfigPollingThread = new Thread(topicConfigPollingService);
@@ -90,6 +103,14 @@ public class PullService {
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
+ private void doKafka(Kafka kafka) {
+ Puller puller = context.getBean(Puller.class, kafka);
+ pullers.add(puller);
+ for (int i = 0; i < kafka.getConsumerCount(); i++) {
+ executorService.submit(puller);
+ }
+ }
+
/**
* stop pulling
*/
@@ -101,7 +122,9 @@ public class PullService {
config.getShutdownLock().writeLock().lock();
try {
logger.info("stop pulling ...");
- puller.shutdown();
+ for (Puller puller : pullers) {
+ puller.shutdown();
+ }
logger.info("stop TopicConfigPollingService ...");
topicConfigPollingService.shutdown();
@@ -118,7 +141,7 @@ public class PullService {
} finally {
config.getShutdownLock().writeLock().unlock();
}
-
+
isRunning = false;
}