From aa70d5683655fd13d476441c056c877f76796a7c Mon Sep 17 00:00:00 2001 From: Guobiao Mo Date: Sun, 26 May 2019 23:08:21 -0700 Subject: Dynamically detect topic updates and new topics Issue-ID: DCAEGEN2-1195 Change-Id: I35d36a9aafe3a7681a9d4745bc509aded111b29d Signed-off-by: Guobiao Mo --- .../feeder/config/ApplicationConfiguration.java | 4 +- .../datalake/feeder/enumeration/DataFormat.java | 31 ++-- .../datalake/feeder/service/CouchbaseService.java | 23 ++- .../onap/datalake/feeder/service/DmaapService.java | 68 ++++++-- .../feeder/service/ElasticsearchService.java | 8 +- .../onap/datalake/feeder/service/HdfsService.java | 42 +++-- .../onap/datalake/feeder/service/PullService.java | 31 ++-- .../onap/datalake/feeder/service/PullThread.java | 172 -------------------- .../org/onap/datalake/feeder/service/Puller.java | 181 +++++++++++++++++++++ .../onap/datalake/feeder/service/StoreService.java | 32 ++-- .../feeder/service/TopicConfigPollingService.java | 141 ++++++++++++++++ .../java/org/onap/datalake/feeder/util/Util.java | 4 + 12 files changed, 475 insertions(+), 262 deletions(-) delete mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java (limited to 'components/datalake-handler/feeder/src/main/java/org') diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index 2a72a76f..73067182 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -57,7 +57,7 @@ public class ApplicationConfiguration { private long dmaapKafkaTimeout; private String[] dmaapKafkaExclude; - private int dmaapCheckNewTopicIntervalInSec; + private int dmaapCheckNewTopicInterval; //in millisecond private int kafkaConsumerCount; @@ -69,5 +69,5 @@ public class ApplicationConfiguration { private int hdfsBatchSize; //Version - private String DatalakeVersion; + private String datalakeVersion; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java index fdcbdfc1..b7bf513c 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java @@ -26,21 +26,20 @@ package org.onap.datalake.feeder.enumeration; * */ public enum DataFormat { - JSON, XML, YAML, TEXT; + JSON("JSON"), XML("XML"), YAML("YAML"), TEXT("TEXT"); - public static DataFormat fromString(String s) { - if ("JSON".equalsIgnoreCase(s)) { - return JSON; - } - if ("XML".equalsIgnoreCase(s)) { - return XML; - } - if ("YAML".equalsIgnoreCase(s)) { - return YAML; - } - if ("TEXT".equalsIgnoreCase(s)) { - return TEXT; - } - throw new IllegalArgumentException("Invalid value for format: " + s); - } + private final String name; + + DataFormat(String name) { + this.name = name; + } + + public static DataFormat fromString(String s) { + for (DataFormat df : DataFormat.values()) { + if (df.name.equalsIgnoreCase(s)) { + return df; + } + } + throw new IllegalArgumentException("Invalid value for format: " + s); + } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java index 7c237766..d7d5f873 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java @@ -22,6 +22,7 @@ package org.onap.datalake.feeder.service; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -32,7 +33,6 @@ import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.dto.TopicConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -40,10 +40,10 @@ import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Cluster; import com.couchbase.client.java.CouchbaseCluster; import com.couchbase.client.java.document.JsonDocument; -import com.couchbase.client.java.document.JsonLongDocument; import com.couchbase.client.java.document.json.JsonObject; import com.couchbase.client.java.env.CouchbaseEnvironment; import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; +import com.couchbase.client.java.error.DocumentAlreadyExistsException; import rx.Observable; import rx.functions.Func1; @@ -83,7 +83,7 @@ public class CouchbaseService { // Create a N1QL Primary Index (but ignore if it exists) bucket.bucketManager().createN1qlPrimaryIndex(true, false); - log.info("Connected to Couchbase {}", couchbase.getHost()); + log.info("Connected to Couchbase {} as {}", couchbase.getHost(), couchbase.getLogin()); isReady = true; } catch (Exception ex) { log.error("error connection to Couchbase.", ex); @@ -113,6 +113,16 @@ public class CouchbaseService { } try { saveDocuments(documents); + } catch (DocumentAlreadyExistsException e) { + log.error("Some or all the following ids are duplicate."); + for(JsonDocument document : documents) { + log.error("saveJsons() DocumentAlreadyExistsException {}", document.id()); + } + } catch (rx.exceptions.CompositeException e) { + List causes = e.getExceptions(); + for(Throwable cause : causes) { + log.error("saveJsons() CompositeException cause {}", cause.getMessage()); + } } catch (Exception e) { log.error("error saving to Couchbase.", e); } @@ -127,14 +137,15 @@ public class CouchbaseService { } String topicStr = topic.getName(); - //String id = topicStr+":"+timestamp+":"+UUID.randomUUID(); + id = topicStr+":"+UUID.randomUUID(); //https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2 //atomically get the next sequence number: // increment by 1, initialize at 0 if counter doc not found //TODO how slow is this compared with above UUID approach? - JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 - id = topicStr + ":" + nextIdNumber.content(); + //sometimes this gives java.util.concurrent.TimeoutException + //JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 + //id = topicStr + ":" + nextIdNumber.content(); return id; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java index 02aa6264..2274ce99 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java @@ -25,9 +25,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.dto.TopicConfig; @@ -53,39 +58,78 @@ public class DmaapService { @Autowired private TopicService topicService; + ZooKeeper zk; + + @PreDestroy + public void cleanUp() throws InterruptedException { + zk.close(); + } + + @PostConstruct + private void init() throws IOException, InterruptedException { + zk = connect(config.getDmaapZookeeperHostPort()); + } + //get all topic names from Zookeeper public List getTopics() { try { - Watcher watcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - // TODO monitor new topics - - } - }; - ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher); + if (zk == null) { + zk = connect(config.getDmaapZookeeperHostPort()); + } + log.info("connecting to ZooKeeper {} for a list of topics.", config.getDmaapZookeeperHostPort()); List topics = zk.getChildren("/brokers/topics", false); String[] excludes = config.getDmaapKafkaExclude(); topics.removeAll(Arrays.asList(excludes)); - zk.close(); + log.info("list of topics: {}", topics); return topics; } catch (Exception e) { + zk = null; log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e); return Collections.emptyList(); } } - public List getActiveTopics() throws IOException { + private ZooKeeper connect(String host) throws IOException, InterruptedException { + log.info("connecting to ZooKeeper {} ...", config.getDmaapZookeeperHostPort()); + CountDownLatch connectedSignal = new CountDownLatch(1); + ZooKeeper ret = new ZooKeeper(host, 10000, new Watcher() { + public void process(WatchedEvent we) { + if (we.getState() == KeeperState.SyncConnected) { + connectedSignal.countDown(); + } + } + }); + + connectedSignal.await(); + return ret; + } + + /* + public List getActiveTopics() throws IOException { + log.debug("entering getActiveTopics()..."); + + List configList = getActiveTopicConfigs(); + + List ret = new ArrayList<>(configList.size()); + configList.stream().forEach(topicConfig -> ret.add(topicConfig.getName())); + + return ret; + } + */ + public List getActiveTopicConfigs() throws IOException { + log.debug("entering getActiveTopicConfigs()..."); List allTopics = getTopics(); if (allTopics == null) { return Collections.emptyList(); } - List ret = new ArrayList<>(allTopics.size()); + List ret = new ArrayList<>(allTopics.size()); for (String topicStr : allTopics) { + log.debug("get topic setting from DB: {}.", topicStr); + TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true); if (topicConfig.isEnabled()) { - ret.add(topicStr); + ret.add(topicConfig); } } return ret; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java index f1bed604..2806e48b 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -55,7 +55,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** - * Service to use Elasticsearch + * Elasticsearch Service for table creation, data submission, as well as data pre-processing. * * @author Guobiao Mo * @@ -172,11 +172,7 @@ public class ElasticsearchService { String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString(); String specificProblem = json.query("/event/faultFields/specificProblem").toString(); - String id = null; - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem); - - id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb" + String id = String.join("^", name, reportingEntityName, specificProblem);//example: id = "aaaa^cccc^bbbbb" String index = topic.getName().toLowerCase(); //get diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java index edbc6757..135a2c09 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java @@ -21,6 +21,7 @@ package org.onap.datalake.feeder.service; import java.io.IOException; +import java.net.InetAddress; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; @@ -39,6 +40,7 @@ import org.apache.hadoop.fs.Path; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.dto.TopicConfig; +import org.onap.datalake.feeder.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,31 +87,44 @@ public class HdfsService { public void flush(String topic) { try { - saveMessages(topic, data); - data.clear(); - lastFlush = System.currentTimeMillis(); - log.debug("done flush, topic={}, buffer size={}", topic, data.size()); + if (!data.isEmpty()) { + saveMessages(topic, data); + data.clear(); + lastFlush = System.currentTimeMillis(); + } } catch (IOException e) { log.error("error saving to HDFS." + topic, e); } } public void flushStall(String topic) { - if (!data.isEmpty() && System.currentTimeMillis() > lastFlush + config.getHdfsFlushInterval()) { + if (!data.isEmpty() && Util.isStall(lastFlush, config.getHdfsFlushInterval())) { log.debug("going to flushStall topic={}, buffer size={}", topic, data.size()); flush(topic); } } + public void addData(List> messages) { + if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer + lastFlush = System.currentTimeMillis(); + } + + messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used + } + private void saveMessages(String topic, List bufferList) throws IOException { - String thread = Thread.currentThread().getName(); + long thread = Thread.currentThread().getId(); Date date = new Date(); String day = dayFormat.get().format(date); String time = timeFormat.get().format(date); - String filePath = String.format("/datalake/%s/%s/%s-%s", topic, day, time, thread); + + InetAddress inetAddress = InetAddress.getLocalHost(); + String hostName = inetAddress.getHostName(); + + String filePath = String.format("/datalake/%s/%s/%s-%s-%s", topic, day, time, hostName, thread); Path path = new Path(filePath); - log.debug("writing to HDFS {}", filePath); + log.debug("writing {} to HDFS {}", bufferList.size(), filePath); // Create a new file and write data to it. FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize()); @@ -140,9 +155,8 @@ public class HdfsService { String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port); hdfsConfig.set("fs.defaultFS", hdfsuri); - //hdfsConfig.set("hadoop.job.ugi", hdfs.getLogin()); System.setProperty("HADOOP_USER_NAME", hdfs.getLogin()); - + log.info("Connecting to -- {} as {}", hdfsuri, hdfs.getLogin()); fileSystem = FileSystem.get(hdfsConfig); @@ -179,12 +193,12 @@ public class HdfsService { Map bufferMap = bufferLocal.get(); final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer()); - List bufferData = buffer.getData(); - - messages.stream().forEach(message -> bufferData.add(message.getRight()));//note that message left is not used + buffer.addData(messages); - if (bufferData.size() >= config.getHdfsBatchSize()) { + if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) { buffer.flush(topicStr); + } else { + log.debug("buffer size too small to flush: bufferData.size() {} < config.getHdfsBatchSize() {}", buffer.getData().size(), config.getHdfsBatchSize()); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java index 48d167b5..7ed88797 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java @@ -21,8 +21,6 @@ package org.onap.datalake.feeder.service; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -31,7 +29,6 @@ import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; /** @@ -48,11 +45,14 @@ public class PullService { private boolean isRunning = false; private ExecutorService executorService; - private List consumers; + private Thread topicConfigPollingThread; @Autowired - private ApplicationContext context; + private Puller puller; + @Autowired + private TopicConfigPollingService topicConfigPollingService; + @Autowired private ApplicationConfiguration config; @@ -74,17 +74,16 @@ public class PullService { } logger.info("start pulling ..."); - int numConsumers = config.getKafkaConsumerCount(); - executorService = Executors.newFixedThreadPool(numConsumers); - consumers = new ArrayList<>(numConsumers); for (int i = 0; i < numConsumers; i++) { - PullThread puller = context.getBean(PullThread.class, i); - consumers.add(puller); executorService.submit(puller); } + + topicConfigPollingThread = new Thread(topicConfigPollingService); + topicConfigPollingThread.setName("TopicConfigPolling"); + topicConfigPollingThread.start(); isRunning = true; @@ -100,14 +99,16 @@ public class PullService { } logger.info("stop pulling ..."); - for (PullThread puller : consumers) { - puller.shutdown(); - } + puller.shutdown(); - executorService.shutdown(); + logger.info("stop TopicConfigPollingService ..."); + topicConfigPollingService.shutdown(); try { - executorService.awaitTermination(10L, TimeUnit.SECONDS); + topicConfigPollingThread.join(); + + executorService.shutdown(); + executorService.awaitTermination(120L, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.error("executor.awaitTermination", e); Thread.currentThread().interrupt(); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java deleted file mode 100644 index 3a07e2f9..00000000 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java +++ /dev/null @@ -1,172 +0,0 @@ -/* -* ============LICENSE_START======================================================= -* ONAP : DATALAKE -* ================================================================================ -* Copyright 2019 China Mobile -*================================================================================= -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* ============LICENSE_END========================================================= -*/ - -package org.onap.datalake.feeder.service; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.annotation.PostConstruct; - -import org.apache.commons.lang3.tuple.Pair; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.onap.datalake.feeder.config.ApplicationConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.config.ConfigurableBeanFactory; -import org.springframework.context.annotation.Scope; -import org.springframework.stereotype.Service; - -/** - * Thread that pulls messages from DMaaP and save them to Big Data DBs - * - * @author Guobiao Mo - * - */ - -@Service -@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) -public class PullThread implements Runnable { - - @Autowired - private DmaapService dmaapService; - - @Autowired - private StoreService storeService; - - @Autowired - private ApplicationConfiguration config; - - private final Logger log = LoggerFactory.getLogger(this.getClass()); - - private KafkaConsumer consumer; // is key-value type, in our case key is empty, value is JSON text - private int id; - - private final AtomicBoolean active = new AtomicBoolean(false); - private boolean async; - - public PullThread(int id) { - this.id = id; - } - - @PostConstruct - private void init() { - async = config.isAsync(); - Properties consumerConfig = getConsumerConfig(); - log.info("Kafka ConsumerConfig: {}", consumerConfig); - consumer = new KafkaConsumer<>(consumerConfig); - } - - private Properties getConsumerConfig() { - Properties consumerConfig = new Properties(); - - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup()); - consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(id)); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor"); - consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - - return consumerConfig; - } - - /** - * start pulling. - */ - @Override - public void run() { - active.set(true); - - DummyRebalanceListener rebalanceListener = new DummyRebalanceListener(); - - try { - List topics = dmaapService.getActiveTopics(); //TODO get updated topic list within loop - - log.info("Thread {} going to subscribe to topics: {}", id, topics); - - consumer.subscribe(topics, rebalanceListener); - - while (active.get()) { - - ConsumerRecords records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout())); - if (records != null) { - List> messages = new ArrayList<>(records.count()); - for (TopicPartition partition : records.partitions()) { - messages.clear(); - List> partitionRecords = records.records(partition); - for (ConsumerRecord record : partitionRecords) { - messages.add(Pair.of(record.timestamp(), record.value())); - //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); - } - storeService.saveMessages(partition.topic(), messages); - log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB - - if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit - long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); - consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); - } - } - - if (async) {//for high Throughput, async commit offset in batch to Kafka - consumer.commitAsync(); - } - } - storeService.flushStall(); - } - } catch (Exception e) { - log.error("Puller {} run(): exception={}", id, e.getMessage()); - log.error("", e); - } finally { - consumer.close(); - } - } - - public void shutdown() { - active.set(false); - consumer.wakeup(); - } - - private class DummyRebalanceListener implements ConsumerRebalanceListener { - @Override - public void onPartitionsRevoked(Collection partitions) { - log.info("Called onPartitionsRevoked with partitions: {}", partitions); - } - - @Override - public void onPartitionsAssigned(Collection partitions) { - log.info("Called onPartitionsAssigned with partitions: {}", partitions); - } - } - -} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java new file mode 100644 index 00000000..9e4ab455 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java @@ -0,0 +1,181 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.PostConstruct; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Service; + +/** + * Thread that pulls messages from DMaaP and save them to Big Data DBs + * + * @author Guobiao Mo + * + */ + +@Service +//@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class Puller implements Runnable { + + @Autowired + private StoreService storeService; + + @Autowired + private TopicConfigPollingService topicConfigPollingService; + + @Autowired + private ApplicationConfiguration config; + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private ThreadLocal> consumerLocal = new ThreadLocal<>(); // is key-value type, in our case key is empty, value is JSON text + + private boolean active = false; + private boolean async; + + @PostConstruct + private void init() { + async = config.isAsync(); + } + + private Properties getConsumerConfig() { + Properties consumerConfig = new Properties(); + + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup()); + consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(Thread.currentThread().getId())); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor"); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + // consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); + // consumerConfig.put("sasl.mechanism", "PLAIN"); + + return consumerConfig; + } + + /** + * start pulling. + */ + @Override + public void run() { + active = true; + Properties consumerConfig = getConsumerConfig(); + log.info("Kafka ConsumerConfig: {}", consumerConfig); + KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); + consumerLocal.set(consumer); + + DummyRebalanceListener rebalanceListener = new DummyRebalanceListener(); + + try { + while (active) { + if (topicConfigPollingService.isActiveTopicsChanged(true)) {//true means update local version as well + List topics = topicConfigPollingService.getActiveTopics(); + log.info("Active Topic list is changed, subscribe to the latest topics: {}", topics); + consumer.subscribe(topics, rebalanceListener); + } + + pull(); + } + storeService.flush(); // force flush all buffer + } catch (Exception e) { + log.error("Puller run() exception.", e); + } finally { + consumer.close(); + log.info("Puller exited run()."); + } + } + + private void pull() { + KafkaConsumer consumer = consumerLocal.get(); + + log.debug("pulling..."); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout())); + log.debug("done pulling."); + + if (records != null && records.count() > 0) { + List> messages = new ArrayList<>(records.count()); + for (TopicPartition partition : records.partitions()) { + messages.clear(); + List> partitionRecords = records.records(partition); + for (ConsumerRecord record : partitionRecords) { + messages.add(Pair.of(record.timestamp(), record.value())); + //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); + } + storeService.saveMessages(partition.topic(), messages); + log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB + + if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + } + } + + if (async) {//for high Throughput, async commit offset in batch to Kafka + consumer.commitAsync(); + } + } else { + log.debug("no record from this polling."); + } + storeService.flushStall(); + } + + public void shutdown() { + active = false; + } + + private class DummyRebalanceListener implements ConsumerRebalanceListener { + @Override + public void onPartitionsRevoked(Collection partitions) { + log.info("Called onPartitionsRevoked with partitions: {}", partitions); + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + log.info("Called onPartitionsAssigned with partitions: {}", partitions); + } + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index 2d00a9b8..126e23b2 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -22,15 +22,12 @@ package org.onap.datalake.feeder.service; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import javax.annotation.PostConstruct; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; - -import org.json.JSONException; import org.json.JSONObject; import org.json.XML; import org.onap.datalake.feeder.config.ApplicationConfiguration; @@ -38,12 +35,9 @@ import org.onap.datalake.feeder.dto.TopicConfig; import org.onap.datalake.feeder.enumeration.DataFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; @@ -64,7 +58,7 @@ public class StoreService { private ApplicationConfiguration config; @Autowired - private TopicService topicService; + private TopicConfigPollingService configPollingService; @Autowired private MongodbService mongodbService; @@ -78,8 +72,6 @@ public class StoreService { @Autowired private HdfsService hdfsService; - private Map topicMap = new HashMap<>(); - private ObjectMapper yamlReader; @PostConstruct @@ -88,28 +80,26 @@ public class StoreService { } public void saveMessages(String topicStr, List> messages) {//pair=ts+text - if (messages == null || messages.isEmpty()) { + if (CollectionUtils.isEmpty(messages)) { return; } - TopicConfig topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically - return topicService.getEffectiveTopic(topicStr); - }); + TopicConfig topicConfig = configPollingService.getEffectiveTopicConfig(topicStr); List docs = new ArrayList<>(); for (Pair pair : messages) { try { - docs.add(messageToJson(topic, pair)); - } catch (Exception e) { + docs.add(messageToJson(topicConfig, pair)); + } catch (IOException e) { log.error(pair.getRight(), e); } } - saveJsons(topic, docs, messages); + saveJsons(topicConfig, docs, messages); } - private JSONObject messageToJson(TopicConfig topic, Pair pair) throws JSONException, JsonParseException, JsonMappingException, IOException { + private JSONObject messageToJson(TopicConfig topic, Pair pair) throws IOException { long timestamp = pair.getLeft(); String text = pair.getRight(); @@ -176,7 +166,11 @@ public class StoreService { } } - public void flushStall() { + public void flush() { //force flush all buffer + hdfsService.flush(); + } + + public void flushStall() { //flush stall buffer hdfsService.flushStall(); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java new file mode 100644 index 00000000..80da55fd --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java @@ -0,0 +1,141 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.PostConstruct; + +import org.apache.commons.collections.CollectionUtils; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.dto.TopicConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Service to check topic changes in Kafka and topic setting updates + * + * @author Guobiao Mo + * + */ +@Service +public class TopicConfigPollingService implements Runnable { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + ApplicationConfiguration config; + + @Autowired + private DmaapService dmaapService; + + //effective TopicConfig Map + private Map effectiveTopicConfigMap = new HashMap<>(); + + //monitor Kafka topic list changes + private List activeTopics; + private ThreadLocal activeTopicsVersionLocal = ThreadLocal.withInitial(() -> -1); + private int currentActiveTopicsVersion = -1; + + private boolean active = false; + + @PostConstruct + private void init() { + try { + log.info("init(), ccalling poll()..."); + activeTopics = poll(); + currentActiveTopicsVersion++; + } catch (Exception ex) { + log.error("error connection to HDFS.", ex); + } + } + + public boolean isActiveTopicsChanged(boolean update) { + boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get(); + log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get()); + if (changed && update) { + activeTopicsVersionLocal.set(currentActiveTopicsVersion); + } + + return changed; + } + + public List getActiveTopics() { + return activeTopics; + } + + public TopicConfig getEffectiveTopicConfig(String topicStr) { + return effectiveTopicConfigMap.get(topicStr); + } + + @Override + public void run() { + active = true; + log.info("TopicConfigPollingService started."); + + while (active) { + try { //sleep first since we already pool in init() + Thread.sleep(config.getDmaapCheckNewTopicInterval()); + } catch (InterruptedException e) { + log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e); + } + + try { + List newTopics = poll(); + if (!CollectionUtils.isEqualCollection(activeTopics, newTopics)) { + log.info("activeTopics list is updated, old={}", activeTopics); + log.info("activeTopics list is updated, new={}", newTopics); + + activeTopics = newTopics; + currentActiveTopicsVersion++; + } else { + log.debug("activeTopics list is not updated."); + } + } catch (IOException e) { + log.error("dmaapService.getActiveTopics()", e); + } + } + + log.info("exit since active is set to false"); + } + + public void shutdown() { + active = false; + } + + private List poll() throws IOException { + log.debug("poll(), use dmaapService to getActiveTopicConfigs..."); + List activeTopicConfigs = dmaapService.getActiveTopicConfigs(); + activeTopicConfigs.stream().forEach(topicConfig -> effectiveTopicConfigMap.put(topicConfig.getName(), topicConfig)); + + List ret = new ArrayList<>(activeTopicConfigs.size()); + activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName())); + + return ret; + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java index aada44bc..d6c26bff 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java @@ -55,4 +55,8 @@ public class Util { return replaceDotInKey(newJson);// there maybe more to replace } } + + public static boolean isStall(long lastTime, long checkInterval) { + return System.currentTimeMillis() > lastTime + checkInterval; + } } -- cgit 1.2.3-korg