From 7d3fb8e0a193c1471fcde17ddbe855b5ac1ebd70 Mon Sep 17 00:00:00 2001 From: Guobiao Mo Date: Thu, 20 Jun 2019 15:30:44 -0700 Subject: No data is lost when program is killed When the program is killed by unix 'kill' or crtl+c, program should exits gracefully after all data is saved to DBs. Issue-ID: DCAEGEN2-1633 Change-Id: Ic134440be507faa44d04434eeaea1035ce7d63f0 Signed-off-by: Guobiao Mo --- .../feeder/config/ApplicationConfiguration.java | 4 +++ .../feeder/controller/FeederController.java | 10 +++++-- .../datalake/feeder/service/CouchbaseService.java | 9 +++++- .../onap/datalake/feeder/service/DmaapService.java | 11 +++++-- .../feeder/service/ElasticsearchService.java | 34 ++++++++++++++++------ .../onap/datalake/feeder/service/HdfsService.java | 19 ++++++++++-- .../datalake/feeder/service/MongodbService.java | 9 +++++- .../onap/datalake/feeder/service/PullService.java | 25 +++++++++------- .../feeder/service/TopicConfigPollingService.java | 15 +++++++--- .../org/onap/datalake/feeder/util/JsonUtil.java | 1 - 10 files changed, 104 insertions(+), 33 deletions(-) (limited to 'components/datalake-handler/feeder/src/main/java/org') diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index fa9f7d98..3e67f38a 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -20,6 +20,8 @@ package org.onap.datalake.feeder.config; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -41,6 +43,8 @@ import lombok.Setter; @EnableAutoConfiguration public class ApplicationConfiguration { + final ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock(); + //App general private boolean async; private boolean enableSSL; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java index 6a44c4f2..0a64ddb3 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java @@ -58,9 +58,12 @@ public class FeederController { @ResponseBody @ApiOperation(value="Start pulling data.") public String start() throws IOException { - log.info("DataLake feeder starting to pull data from DMaaP..."); + log.info("Going to start DataLake feeder ..."); if(pullService.isRunning() == false) { pullService.start(); + log.info("DataLake feeder started."); + }else { + log.info("DataLake feeder already started."); } return "{\"running\": true}"; } @@ -72,11 +75,14 @@ public class FeederController { @ResponseBody @ApiOperation(value="Stop pulling data.") public String stop() { + log.info("Going to stop DataLake feeder ..."); if(pullService.isRunning() == true) { pullService.shutdown(); + log.info("DataLake feeder is stopped."); + }else { + log.info("DataLake feeder already stopped."); } - log.info("DataLake feeder is stopped."); return "{\"running\": false}"; } /** diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java index d7d5f873..fc31b2eb 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java @@ -93,7 +93,14 @@ public class CouchbaseService { @PreDestroy public void cleanUp() { - bucket.close(); + config.getShutdownLock().readLock().lock(); + + try { + log.info("bucket.close() at cleanUp."); + bucket.close(); + } finally { + config.getShutdownLock().readLock().unlock(); + } } public void saveJsons(TopicConfig topic, List jsons) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java index 3be5be6e..5c544d6c 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java @@ -62,8 +62,15 @@ public class DmaapService { @PreDestroy public void cleanUp() throws InterruptedException { - if (zk != null) { - zk.close(); + config.getShutdownLock().readLock().lock(); + + try { + if (zk != null) { + log.info("cleanUp() called, close zk."); + zk.close(); + } + } finally { + config.getShutdownLock().readLock().unlock(); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java index 2806e48b..b40f544c 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -73,7 +73,7 @@ public class ElasticsearchService { private RestHighLevelClient client; ActionListener listener; - + //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html @PostConstruct @@ -89,7 +89,9 @@ public class ElasticsearchService { listener = new ActionListener() { @Override public void onResponse(BulkResponse bulkResponse) { - + if(bulkResponse.hasFailures()) { + log.debug(bulkResponse.buildFailureMessage()); + } } @Override @@ -101,7 +103,16 @@ public class ElasticsearchService { @PreDestroy public void cleanUp() throws IOException { - client.close(); + config.getShutdownLock().readLock().lock(); + + try { + log.info("cleanUp() closing Elasticsearch client."); + client.close(); + } catch (IOException e) { + log.error("client.close() at cleanUp.", e); + } finally { + config.getShutdownLock().readLock().unlock(); + } } public void ensureTableExist(String topic) throws IOException { @@ -120,6 +131,7 @@ public class ElasticsearchService { //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME public void saveJsons(TopicConfig topic, List jsons) { + BulkRequest request = new BulkRequest(); for (JSONObject json : jsons) { @@ -128,11 +140,11 @@ public class ElasticsearchService { if (found) { continue; } - } - + } + String id = topic.getMessageId(json); //id can be null - - request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON)); + + request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON)); } log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size()); @@ -141,13 +153,17 @@ public class ElasticsearchService { client.bulkAsync(request, RequestOptions.DEFAULT, listener); } else { try { - client.bulk(request, RequestOptions.DEFAULT); + BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); + if(bulkResponse.hasFailures()) { + log.debug(bulkResponse.buildFailureMessage()); + } } catch (IOException e) { log.error(topic.getName(), e); } } + } - + /** * * @param topic diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java index 135a2c09..d92d05ac 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java @@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ShutdownHookManager; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.dto.TopicConfig; @@ -93,7 +94,7 @@ public class HdfsService { lastFlush = System.currentTimeMillis(); } } catch (IOException e) { - log.error("error saving to HDFS." + topic, e); + log.error("{} error saving to HDFS. {}", topic, e.getMessage()); } } @@ -134,11 +135,12 @@ public class HdfsService { out.writeUTF(message); out.write('\n'); } catch (IOException e) { - log.error("error writing to HDFS.", e); + log.error("error writing to HDFS. {}", e.getMessage()); } }); out.close(); + log.debug("Done writing {} to HDFS {}", bufferList.size(), filePath); } } @@ -161,6 +163,10 @@ public class HdfsService { fileSystem = FileSystem.get(hdfsConfig); + //disable Hadoop Shutdown Hook, we need the HDFS connection to flush data + ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get(); + hadoopShutdownHookManager.clearShutdownHooks(); + isReady = true; } catch (Exception ex) { log.error("error connection to HDFS.", ex); @@ -170,20 +176,27 @@ public class HdfsService { @PreDestroy public void cleanUp() { + config.getShutdownLock().readLock().lock(); + try { + log.info("fileSystem.close() at cleanUp."); flush(); fileSystem.close(); } catch (IOException e) { log.error("fileSystem.close() at cleanUp.", e); + } finally { + config.getShutdownLock().readLock().unlock(); } } public void flush() { + log.info("Force flush ALL data, regardless of stall"); bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic)); } //if no new data comes in for a topic for a while, need to flush its buffer public void flushStall() { + log.debug("Flush stall data"); bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic)); } @@ -198,7 +211,7 @@ public class HdfsService { if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) { buffer.flush(topicStr); } else { - log.debug("buffer size too small to flush: bufferData.size() {} < config.getHdfsBatchSize() {}", buffer.getData().size(), config.getHdfsBatchSize()); + log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize()); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java index 32d21c62..f3462e49 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java @@ -131,7 +131,14 @@ public class MongodbService { @PreDestroy public void cleanUp() { - mongoClient.close(); + config.getShutdownLock().readLock().lock(); + + try { + log.info("mongoClient.close() at cleanUp."); + mongoClient.close(); + } finally { + config.getShutdownLock().readLock().unlock(); + } } public void saveJsons(TopicConfig topic, List jsons) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java index 7ed88797..84d5f337 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java @@ -52,7 +52,7 @@ public class PullService { @Autowired private TopicConfigPollingService topicConfigPollingService; - + @Autowired private ApplicationConfiguration config; @@ -80,7 +80,7 @@ public class PullService { for (int i = 0; i < numConsumers; i++) { executorService.submit(puller); } - + topicConfigPollingThread = new Thread(topicConfigPollingService); topicConfigPollingThread.setName("TopicConfigPolling"); topicConfigPollingThread.start(); @@ -98,22 +98,27 @@ public class PullService { return; } - logger.info("stop pulling ..."); - puller.shutdown(); + config.getShutdownLock().writeLock().lock(); + try { + logger.info("stop pulling ..."); + puller.shutdown(); - logger.info("stop TopicConfigPollingService ..."); - topicConfigPollingService.shutdown(); + logger.info("stop TopicConfigPollingService ..."); + topicConfigPollingService.shutdown(); - try { topicConfigPollingThread.join(); - + executorService.shutdown(); executorService.awaitTermination(120L, TimeUnit.SECONDS); } catch (InterruptedException e) { - logger.error("executor.awaitTermination", e); + logger.error("shutdown(): executor.awaitTermination", e); Thread.currentThread().interrupt(); + } catch (Exception e) { + logger.error("shutdown error.", e); + } finally { + config.getShutdownLock().writeLock().unlock(); } - + isRunning = false; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java index 58b27834..21e1a08f 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java @@ -37,7 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** - * Service to check topic changes in Kafka and topic setting updates + * Service to check topic changes in Kafka and topic setting updates in DB * * @author Guobiao Mo * @@ -74,7 +74,7 @@ public class TopicConfigPollingService implements Runnable { } } - public boolean isActiveTopicsChanged(boolean update) { + public boolean isActiveTopicsChanged(boolean update) {//update=true means sync local version boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get(); log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get()); if (changed && update) { @@ -96,10 +96,13 @@ public class TopicConfigPollingService implements Runnable { public void run() { active = true; log.info("TopicConfigPollingService started."); - + while (active) { try { //sleep first since we already pool in init() Thread.sleep(config.getDmaapCheckNewTopicInterval()); + if(!active) { + break; + } } catch (InterruptedException e) { log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e); Thread.currentThread().interrupt(); @@ -131,7 +134,11 @@ public class TopicConfigPollingService implements Runnable { private List poll() throws IOException { log.debug("poll(), use dmaapService to getActiveTopicConfigs..."); List activeTopicConfigs = dmaapService.getActiveTopicConfigs(); - activeTopicConfigs.stream().forEach(topicConfig -> effectiveTopicConfigMap.put(topicConfig.getName(), topicConfig)); + Map tempEffectiveTopicConfigMap = new HashMap<>(); + + activeTopicConfigs.stream().forEach(topicConfig -> tempEffectiveTopicConfigMap.put(topicConfig.getName(), topicConfig)); + effectiveTopicConfigMap = tempEffectiveTopicConfigMap; + log.debug("poll(), effectiveTopicConfigMap={}", effectiveTopicConfigMap); List ret = new ArrayList<>(activeTopicConfigs.size()); activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName())); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java index db4dcfae..5c77d895 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java @@ -22,7 +22,6 @@ package org.onap.datalake.feeder.util; import java.util.HashMap; -import org.apache.commons.collections.CollectionUtils; import org.json.JSONArray; import org.json.JSONObject; -- cgit 1.2.3-korg