summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-05-26 23:08:21 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-05-26 23:08:21 -0700
commitaa70d5683655fd13d476441c056c877f76796a7c (patch)
treeefb8994c328e0f0c9fe0c1627be64366f84b4f03 /components/datalake-handler/feeder/src/main/java/org
parentd564502aa81ecc64432c6afbd5c18a18bfad9c6b (diff)
Dynamically detect topic updates and new topics
Issue-ID: DCAEGEN2-1195 Change-Id: I35d36a9aafe3a7681a9d4745bc509aded111b29d Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java31
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java23
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java68
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java8
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java42
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java31
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java (renamed from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java)111
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java32
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java141
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java4
11 files changed, 354 insertions, 141 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 2a72a76f..73067182 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -57,7 +57,7 @@ public class ApplicationConfiguration {
private long dmaapKafkaTimeout;
private String[] dmaapKafkaExclude;
- private int dmaapCheckNewTopicIntervalInSec;
+ private int dmaapCheckNewTopicInterval; //in millisecond
private int kafkaConsumerCount;
@@ -69,5 +69,5 @@ public class ApplicationConfiguration {
private int hdfsBatchSize;
//Version
- private String DatalakeVersion;
+ private String datalakeVersion;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java
index fdcbdfc1..b7bf513c 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java
@@ -26,21 +26,20 @@ package org.onap.datalake.feeder.enumeration;
*
*/
public enum DataFormat {
- JSON, XML, YAML, TEXT;
+ JSON("JSON"), XML("XML"), YAML("YAML"), TEXT("TEXT");
- public static DataFormat fromString(String s) {
- if ("JSON".equalsIgnoreCase(s)) {
- return JSON;
- }
- if ("XML".equalsIgnoreCase(s)) {
- return XML;
- }
- if ("YAML".equalsIgnoreCase(s)) {
- return YAML;
- }
- if ("TEXT".equalsIgnoreCase(s)) {
- return TEXT;
- }
- throw new IllegalArgumentException("Invalid value for format: " + s);
- }
+ private final String name;
+
+ DataFormat(String name) {
+ this.name = name;
+ }
+
+ public static DataFormat fromString(String s) {
+ for (DataFormat df : DataFormat.values()) {
+ if (df.name.equalsIgnoreCase(s)) {
+ return df;
+ }
+ }
+ throw new IllegalArgumentException("Invalid value for format: " + s);
+ }
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
index 7c237766..d7d5f873 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
@@ -22,6 +22,7 @@ package org.onap.datalake.feeder.service;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -32,7 +33,6 @@ import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -40,10 +40,10 @@ import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.JsonDocument;
-import com.couchbase.client.java.document.JsonLongDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
+import com.couchbase.client.java.error.DocumentAlreadyExistsException;
import rx.Observable;
import rx.functions.Func1;
@@ -83,7 +83,7 @@ public class CouchbaseService {
// Create a N1QL Primary Index (but ignore if it exists)
bucket.bucketManager().createN1qlPrimaryIndex(true, false);
- log.info("Connected to Couchbase {}", couchbase.getHost());
+ log.info("Connected to Couchbase {} as {}", couchbase.getHost(), couchbase.getLogin());
isReady = true;
} catch (Exception ex) {
log.error("error connection to Couchbase.", ex);
@@ -113,6 +113,16 @@ public class CouchbaseService {
}
try {
saveDocuments(documents);
+ } catch (DocumentAlreadyExistsException e) {
+ log.error("Some or all the following ids are duplicate.");
+ for(JsonDocument document : documents) {
+ log.error("saveJsons() DocumentAlreadyExistsException {}", document.id());
+ }
+ } catch (rx.exceptions.CompositeException e) {
+ List<Throwable> causes = e.getExceptions();
+ for(Throwable cause : causes) {
+ log.error("saveJsons() CompositeException cause {}", cause.getMessage());
+ }
} catch (Exception e) {
log.error("error saving to Couchbase.", e);
}
@@ -127,14 +137,15 @@ public class CouchbaseService {
}
String topicStr = topic.getName();
- //String id = topicStr+":"+timestamp+":"+UUID.randomUUID();
+ id = topicStr+":"+UUID.randomUUID();
//https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2
//atomically get the next sequence number:
// increment by 1, initialize at 0 if counter doc not found
//TODO how slow is this compared with above UUID approach?
- JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345
- id = topicStr + ":" + nextIdNumber.content();
+ //sometimes this gives java.util.concurrent.TimeoutException
+ //JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345
+ //id = topicStr + ":" + nextIdNumber.content();
return id;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
index 02aa6264..2274ce99 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
@@ -25,9 +25,14 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.dto.TopicConfig;
@@ -53,39 +58,78 @@ public class DmaapService {
@Autowired
private TopicService topicService;
+ ZooKeeper zk;
+
+ @PreDestroy
+ public void cleanUp() throws InterruptedException {
+ zk.close();
+ }
+
+ @PostConstruct
+ private void init() throws IOException, InterruptedException {
+ zk = connect(config.getDmaapZookeeperHostPort());
+ }
+
//get all topic names from Zookeeper
public List<String> getTopics() {
try {
- Watcher watcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // TODO monitor new topics
-
- }
- };
- ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher);
+ if (zk == null) {
+ zk = connect(config.getDmaapZookeeperHostPort());
+ }
+ log.info("connecting to ZooKeeper {} for a list of topics.", config.getDmaapZookeeperHostPort());
List<String> topics = zk.getChildren("/brokers/topics", false);
String[] excludes = config.getDmaapKafkaExclude();
topics.removeAll(Arrays.asList(excludes));
- zk.close();
+ log.info("list of topics: {}", topics);
return topics;
} catch (Exception e) {
+ zk = null;
log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e);
return Collections.emptyList();
}
}
- public List<String> getActiveTopics() throws IOException {
+ private ZooKeeper connect(String host) throws IOException, InterruptedException {
+ log.info("connecting to ZooKeeper {} ...", config.getDmaapZookeeperHostPort());
+ CountDownLatch connectedSignal = new CountDownLatch(1);
+ ZooKeeper ret = new ZooKeeper(host, 10000, new Watcher() {
+ public void process(WatchedEvent we) {
+ if (we.getState() == KeeperState.SyncConnected) {
+ connectedSignal.countDown();
+ }
+ }
+ });
+
+ connectedSignal.await();
+ return ret;
+ }
+
+ /*
+ public List<String> getActiveTopics() throws IOException {
+ log.debug("entering getActiveTopics()...");
+
+ List<TopicConfig> configList = getActiveTopicConfigs();
+
+ List<String> ret = new ArrayList<>(configList.size());
+ configList.stream().forEach(topicConfig -> ret.add(topicConfig.getName()));
+
+ return ret;
+ }
+ */
+ public List<TopicConfig> getActiveTopicConfigs() throws IOException {
+ log.debug("entering getActiveTopicConfigs()...");
List<String> allTopics = getTopics();
if (allTopics == null) {
return Collections.emptyList();
}
- List<String> ret = new ArrayList<>(allTopics.size());
+ List<TopicConfig> ret = new ArrayList<>(allTopics.size());
for (String topicStr : allTopics) {
+ log.debug("get topic setting from DB: {}.", topicStr);
+
TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true);
if (topicConfig.isEnabled()) {
- ret.add(topicStr);
+ ret.add(topicConfig);
}
}
return ret;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index f1bed604..2806e48b 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -55,7 +55,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
- * Service to use Elasticsearch
+ * Elasticsearch Service for table creation, data submission, as well as data pre-processing.
*
* @author Guobiao Mo
*
@@ -172,11 +172,7 @@ public class ElasticsearchService {
String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
String specificProblem = json.query("/event/faultFields/specificProblem").toString();
- String id = null;
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem);
-
- id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb"
+ String id = String.join("^", name, reportingEntityName, specificProblem);//example: id = "aaaa^cccc^bbbbb"
String index = topic.getName().toLowerCase();
//get
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
index edbc6757..135a2c09 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
@@ -21,6 +21,7 @@
package org.onap.datalake.feeder.service;
import java.io.IOException;
+import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
@@ -39,6 +40,7 @@ import org.apache.hadoop.fs.Path;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.dto.TopicConfig;
+import org.onap.datalake.feeder.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,31 +87,44 @@ public class HdfsService {
public void flush(String topic) {
try {
- saveMessages(topic, data);
- data.clear();
- lastFlush = System.currentTimeMillis();
- log.debug("done flush, topic={}, buffer size={}", topic, data.size());
+ if (!data.isEmpty()) {
+ saveMessages(topic, data);
+ data.clear();
+ lastFlush = System.currentTimeMillis();
+ }
} catch (IOException e) {
log.error("error saving to HDFS." + topic, e);
}
}
public void flushStall(String topic) {
- if (!data.isEmpty() && System.currentTimeMillis() > lastFlush + config.getHdfsFlushInterval()) {
+ if (!data.isEmpty() && Util.isStall(lastFlush, config.getHdfsFlushInterval())) {
log.debug("going to flushStall topic={}, buffer size={}", topic, data.size());
flush(topic);
}
}
+ public void addData(List<Pair<Long, String>> messages) {
+ if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
+ lastFlush = System.currentTimeMillis();
+ }
+
+ messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used
+ }
+
private void saveMessages(String topic, List<String> bufferList) throws IOException {
- String thread = Thread.currentThread().getName();
+ long thread = Thread.currentThread().getId();
Date date = new Date();
String day = dayFormat.get().format(date);
String time = timeFormat.get().format(date);
- String filePath = String.format("/datalake/%s/%s/%s-%s", topic, day, time, thread);
+
+ InetAddress inetAddress = InetAddress.getLocalHost();
+ String hostName = inetAddress.getHostName();
+
+ String filePath = String.format("/datalake/%s/%s/%s-%s-%s", topic, day, time, hostName, thread);
Path path = new Path(filePath);
- log.debug("writing to HDFS {}", filePath);
+ log.debug("writing {} to HDFS {}", bufferList.size(), filePath);
// Create a new file and write data to it.
FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize());
@@ -140,9 +155,8 @@ public class HdfsService {
String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port);
hdfsConfig.set("fs.defaultFS", hdfsuri);
- //hdfsConfig.set("hadoop.job.ugi", hdfs.getLogin());
System.setProperty("HADOOP_USER_NAME", hdfs.getLogin());
-
+
log.info("Connecting to -- {} as {}", hdfsuri, hdfs.getLogin());
fileSystem = FileSystem.get(hdfsConfig);
@@ -179,12 +193,12 @@ public class HdfsService {
Map<String, Buffer> bufferMap = bufferLocal.get();
final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
- List<String> bufferData = buffer.getData();
-
- messages.stream().forEach(message -> bufferData.add(message.getRight()));//note that message left is not used
+ buffer.addData(messages);
- if (bufferData.size() >= config.getHdfsBatchSize()) {
+ if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
buffer.flush(topicStr);
+ } else {
+ log.debug("buffer size too small to flush: bufferData.size() {} < config.getHdfsBatchSize() {}", buffer.getData().size(), config.getHdfsBatchSize());
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
index 48d167b5..7ed88797 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
@@ -21,8 +21,6 @@
package org.onap.datalake.feeder.service;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -31,7 +29,6 @@ import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
/**
@@ -48,12 +45,15 @@ public class PullService {
private boolean isRunning = false;
private ExecutorService executorService;
- private List<PullThread> consumers;
+ private Thread topicConfigPollingThread;
@Autowired
- private ApplicationContext context;
+ private Puller puller;
@Autowired
+ private TopicConfigPollingService topicConfigPollingService;
+
+ @Autowired
private ApplicationConfiguration config;
/**
@@ -74,17 +74,16 @@ public class PullService {
}
logger.info("start pulling ...");
-
int numConsumers = config.getKafkaConsumerCount();
-
executorService = Executors.newFixedThreadPool(numConsumers);
- consumers = new ArrayList<>(numConsumers);
for (int i = 0; i < numConsumers; i++) {
- PullThread puller = context.getBean(PullThread.class, i);
- consumers.add(puller);
executorService.submit(puller);
}
+
+ topicConfigPollingThread = new Thread(topicConfigPollingService);
+ topicConfigPollingThread.setName("TopicConfigPolling");
+ topicConfigPollingThread.start();
isRunning = true;
@@ -100,14 +99,16 @@ public class PullService {
}
logger.info("stop pulling ...");
- for (PullThread puller : consumers) {
- puller.shutdown();
- }
+ puller.shutdown();
- executorService.shutdown();
+ logger.info("stop TopicConfigPollingService ...");
+ topicConfigPollingService.shutdown();
try {
- executorService.awaitTermination(10L, TimeUnit.SECONDS);
+ topicConfigPollingThread.join();
+
+ executorService.shutdown();
+ executorService.awaitTermination(120L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("executor.awaitTermination", e);
Thread.currentThread().interrupt();
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
index 3a07e2f9..9e4ab455 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
@@ -54,36 +54,28 @@ import org.springframework.stereotype.Service;
*/
@Service
-@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
-public class PullThread implements Runnable {
+//@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+public class Puller implements Runnable {
@Autowired
- private DmaapService dmaapService;
+ private StoreService storeService;
@Autowired
- private StoreService storeService;
+ private TopicConfigPollingService topicConfigPollingService;
@Autowired
private ApplicationConfiguration config;
private final Logger log = LoggerFactory.getLogger(this.getClass());
- private KafkaConsumer<String, String> consumer; //<String, String> is key-value type, in our case key is empty, value is JSON text
- private int id;
+ private ThreadLocal<KafkaConsumer<String, String>> consumerLocal = new ThreadLocal<>(); //<String, String> is key-value type, in our case key is empty, value is JSON text
- private final AtomicBoolean active = new AtomicBoolean(false);
+ private boolean active = false;
private boolean async;
- public PullThread(int id) {
- this.id = id;
- }
-
@PostConstruct
private void init() {
async = config.isAsync();
- Properties consumerConfig = getConsumerConfig();
- log.info("Kafka ConsumerConfig: {}", consumerConfig);
- consumer = new KafkaConsumer<>(consumerConfig);
}
private Properties getConsumerConfig() {
@@ -91,13 +83,16 @@ public class PullThread implements Runnable {
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup());
- consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(id));
+ consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(Thread.currentThread().getId()));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ // consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
+ // consumerConfig.put("sasl.mechanism", "PLAIN");
+
return consumerConfig;
}
@@ -106,55 +101,69 @@ public class PullThread implements Runnable {
*/
@Override
public void run() {
- active.set(true);
+ active = true;
+ Properties consumerConfig = getConsumerConfig();
+ log.info("Kafka ConsumerConfig: {}", consumerConfig);
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
+ consumerLocal.set(consumer);
DummyRebalanceListener rebalanceListener = new DummyRebalanceListener();
try {
- List<String> topics = dmaapService.getActiveTopics(); //TODO get updated topic list within loop
-
- log.info("Thread {} going to subscribe to topics: {}", id, topics);
-
- consumer.subscribe(topics, rebalanceListener);
-
- while (active.get()) {
-
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
- if (records != null) {
- List<Pair<Long, String>> messages = new ArrayList<>(records.count());
- for (TopicPartition partition : records.partitions()) {
- messages.clear();
- List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
- for (ConsumerRecord<String, String> record : partitionRecords) {
- messages.add(Pair.of(record.timestamp(), record.value()));
- //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
- }
- storeService.saveMessages(partition.topic(), messages);
- log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
-
- if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
- long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
- consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
- }
- }
-
- if (async) {//for high Throughput, async commit offset in batch to Kafka
- consumer.commitAsync();
- }
+ while (active) {
+ if (topicConfigPollingService.isActiveTopicsChanged(true)) {//true means update local version as well
+ List<String> topics = topicConfigPollingService.getActiveTopics();
+ log.info("Active Topic list is changed, subscribe to the latest topics: {}", topics);
+ consumer.subscribe(topics, rebalanceListener);
}
- storeService.flushStall();
+
+ pull();
}
+ storeService.flush(); // force flush all buffer
} catch (Exception e) {
- log.error("Puller {} run(): exception={}", id, e.getMessage());
- log.error("", e);
+ log.error("Puller run() exception.", e);
} finally {
consumer.close();
+ log.info("Puller exited run().");
+ }
+ }
+
+ private void pull() {
+ KafkaConsumer<String, String> consumer = consumerLocal.get();
+
+ log.debug("pulling...");
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
+ log.debug("done pulling.");
+
+ if (records != null && records.count() > 0) {
+ List<Pair<Long, String>> messages = new ArrayList<>(records.count());
+ for (TopicPartition partition : records.partitions()) {
+ messages.clear();
+ List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+ for (ConsumerRecord<String, String> record : partitionRecords) {
+ messages.add(Pair.of(record.timestamp(), record.value()));
+ //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+ }
+ storeService.saveMessages(partition.topic(), messages);
+ log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
+
+ if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ }
+ }
+
+ if (async) {//for high Throughput, async commit offset in batch to Kafka
+ consumer.commitAsync();
+ }
+ } else {
+ log.debug("no record from this polling.");
}
+ storeService.flushStall();
}
public void shutdown() {
- active.set(false);
- consumer.wakeup();
+ active = false;
}
private class DummyRebalanceListener implements ConsumerRebalanceListener {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index 2d00a9b8..126e23b2 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -22,15 +22,12 @@ package org.onap.datalake.feeder.service;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import javax.annotation.PostConstruct;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
-
-import org.json.JSONException;
import org.json.JSONObject;
import org.json.XML;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
@@ -38,12 +35,9 @@ import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -64,7 +58,7 @@ public class StoreService {
private ApplicationConfiguration config;
@Autowired
- private TopicService topicService;
+ private TopicConfigPollingService configPollingService;
@Autowired
private MongodbService mongodbService;
@@ -78,8 +72,6 @@ public class StoreService {
@Autowired
private HdfsService hdfsService;
- private Map<String, TopicConfig> topicMap = new HashMap<>();
-
private ObjectMapper yamlReader;
@PostConstruct
@@ -88,28 +80,26 @@ public class StoreService {
}
public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
- if (messages == null || messages.isEmpty()) {
+ if (CollectionUtils.isEmpty(messages)) {
return;
}
- TopicConfig topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically
- return topicService.getEffectiveTopic(topicStr);
- });
+ TopicConfig topicConfig = configPollingService.getEffectiveTopicConfig(topicStr);
List<JSONObject> docs = new ArrayList<>();
for (Pair<Long, String> pair : messages) {
try {
- docs.add(messageToJson(topic, pair));
- } catch (Exception e) {
+ docs.add(messageToJson(topicConfig, pair));
+ } catch (IOException e) {
log.error(pair.getRight(), e);
}
}
- saveJsons(topic, docs, messages);
+ saveJsons(topicConfig, docs, messages);
}
- private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
+ private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws IOException {
long timestamp = pair.getLeft();
String text = pair.getRight();
@@ -176,7 +166,11 @@ public class StoreService {
}
}
- public void flushStall() {
+ public void flush() { //force flush all buffer
+ hdfsService.flush();
+ }
+
+ public void flushStall() { //flush stall buffer
hdfsService.flushStall();
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
new file mode 100644
index 00000000..80da55fd
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
@@ -0,0 +1,141 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.dto.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service to check topic changes in Kafka and topic setting updates
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class TopicConfigPollingService implements Runnable {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ ApplicationConfiguration config;
+
+ @Autowired
+ private DmaapService dmaapService;
+
+ //effective TopicConfig Map
+ private Map<String, TopicConfig> effectiveTopicConfigMap = new HashMap<>();
+
+ //monitor Kafka topic list changes
+ private List<String> activeTopics;
+ private ThreadLocal<Integer> activeTopicsVersionLocal = ThreadLocal.withInitial(() -> -1);
+ private int currentActiveTopicsVersion = -1;
+
+ private boolean active = false;
+
+ @PostConstruct
+ private void init() {
+ try {
+ log.info("init(), ccalling poll()...");
+ activeTopics = poll();
+ currentActiveTopicsVersion++;
+ } catch (Exception ex) {
+ log.error("error connection to HDFS.", ex);
+ }
+ }
+
+ public boolean isActiveTopicsChanged(boolean update) {
+ boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get();
+ log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get());
+ if (changed && update) {
+ activeTopicsVersionLocal.set(currentActiveTopicsVersion);
+ }
+
+ return changed;
+ }
+
+ public List<String> getActiveTopics() {
+ return activeTopics;
+ }
+
+ public TopicConfig getEffectiveTopicConfig(String topicStr) {
+ return effectiveTopicConfigMap.get(topicStr);
+ }
+
+ @Override
+ public void run() {
+ active = true;
+ log.info("TopicConfigPollingService started.");
+
+ while (active) {
+ try { //sleep first since we already pool in init()
+ Thread.sleep(config.getDmaapCheckNewTopicInterval());
+ } catch (InterruptedException e) {
+ log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e);
+ }
+
+ try {
+ List<String> newTopics = poll();
+ if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) {
+ log.info("activeTopics list is updated, old={}", activeTopics);
+ log.info("activeTopics list is updated, new={}", newTopics);
+
+ activeTopics = newTopics;
+ currentActiveTopicsVersion++;
+ } else {
+ log.debug("activeTopics list is not updated.");
+ }
+ } catch (IOException e) {
+ log.error("dmaapService.getActiveTopics()", e);
+ }
+ }
+
+ log.info("exit since active is set to false");
+ }
+
+ public void shutdown() {
+ active = false;
+ }
+
+ private List<String> poll() throws IOException {
+ log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
+ List<TopicConfig> activeTopicConfigs = dmaapService.getActiveTopicConfigs();
+ activeTopicConfigs.stream().forEach(topicConfig -> effectiveTopicConfigMap.put(topicConfig.getName(), topicConfig));
+
+ List<String> ret = new ArrayList<>(activeTopicConfigs.size());
+ activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName()));
+
+ return ret;
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java
index aada44bc..d6c26bff 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java
@@ -55,4 +55,8 @@ public class Util {
return replaceDotInKey(newJson);// there maybe more to replace
}
}
+
+ public static boolean isStall(long lastTime, long checkInterval) {
+ return System.currentTimeMillis() > lastTime + checkInterval;
+ }
}