summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-06-20 15:30:44 -0700
committerYan Yang <yangyanyj@chinamobile.com>2019-06-21 05:12:48 +0000
commit7d3fb8e0a193c1471fcde17ddbe855b5ac1ebd70 (patch)
tree6f40c532cefad95c8a42981283661a3bd6e84f6a /components/datalake-handler/feeder/src/main/java/org
parent369eb4e1b30be855a4c4224d6c0d53ca97423eaf (diff)
No data is lost when program is killed
When the program is killed by unix 'kill' or crtl+c, program should exits gracefully after all data is saved to DBs. Issue-ID: DCAEGEN2-1633 Change-Id: Ic134440be507faa44d04434eeaea1035ce7d63f0 Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java10
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java9
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java11
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java34
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java19
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java9
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java25
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java15
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java1
10 files changed, 104 insertions, 33 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index fa9f7d98..3e67f38a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -20,6 +20,8 @@
package org.onap.datalake.feeder.config;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -41,6 +43,8 @@ import lombok.Setter;
@EnableAutoConfiguration
public class ApplicationConfiguration {
+ final ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
+
//App general
private boolean async;
private boolean enableSSL;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
index 6a44c4f2..0a64ddb3 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
@@ -58,9 +58,12 @@ public class FeederController {
@ResponseBody
@ApiOperation(value="Start pulling data.")
public String start() throws IOException {
- log.info("DataLake feeder starting to pull data from DMaaP...");
+ log.info("Going to start DataLake feeder ...");
if(pullService.isRunning() == false) {
pullService.start();
+ log.info("DataLake feeder started.");
+ }else {
+ log.info("DataLake feeder already started.");
}
return "{\"running\": true}";
}
@@ -72,11 +75,14 @@ public class FeederController {
@ResponseBody
@ApiOperation(value="Stop pulling data.")
public String stop() {
+ log.info("Going to stop DataLake feeder ...");
if(pullService.isRunning() == true)
{
pullService.shutdown();
+ log.info("DataLake feeder is stopped.");
+ }else {
+ log.info("DataLake feeder already stopped.");
}
- log.info("DataLake feeder is stopped.");
return "{\"running\": false}";
}
/**
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
index d7d5f873..fc31b2eb 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
@@ -93,7 +93,14 @@ public class CouchbaseService {
@PreDestroy
public void cleanUp() {
- bucket.close();
+ config.getShutdownLock().readLock().lock();
+
+ try {
+ log.info("bucket.close() at cleanUp.");
+ bucket.close();
+ } finally {
+ config.getShutdownLock().readLock().unlock();
+ }
}
public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
index 3be5be6e..5c544d6c 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
@@ -62,8 +62,15 @@ public class DmaapService {
@PreDestroy
public void cleanUp() throws InterruptedException {
- if (zk != null) {
- zk.close();
+ config.getShutdownLock().readLock().lock();
+
+ try {
+ if (zk != null) {
+ log.info("cleanUp() called, close zk.");
+ zk.close();
+ }
+ } finally {
+ config.getShutdownLock().readLock().unlock();
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index 2806e48b..b40f544c 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -73,7 +73,7 @@ public class ElasticsearchService {
private RestHighLevelClient client;
ActionListener<BulkResponse> listener;
-
+
//ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
//Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
@PostConstruct
@@ -89,7 +89,9 @@ public class ElasticsearchService {
listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
-
+ if(bulkResponse.hasFailures()) {
+ log.debug(bulkResponse.buildFailureMessage());
+ }
}
@Override
@@ -101,7 +103,16 @@ public class ElasticsearchService {
@PreDestroy
public void cleanUp() throws IOException {
- client.close();
+ config.getShutdownLock().readLock().lock();
+
+ try {
+ log.info("cleanUp() closing Elasticsearch client.");
+ client.close();
+ } catch (IOException e) {
+ log.error("client.close() at cleanUp.", e);
+ } finally {
+ config.getShutdownLock().readLock().unlock();
+ }
}
public void ensureTableExist(String topic) throws IOException {
@@ -120,6 +131,7 @@ public class ElasticsearchService {
//TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+
BulkRequest request = new BulkRequest();
for (JSONObject json : jsons) {
@@ -128,11 +140,11 @@ public class ElasticsearchService {
if (found) {
continue;
}
- }
-
+ }
+
String id = topic.getMessageId(json); //id can be null
-
- request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
+
+ request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
}
log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size());
@@ -141,13 +153,17 @@ public class ElasticsearchService {
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
} else {
try {
- client.bulk(request, RequestOptions.DEFAULT);
+ BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
+ if(bulkResponse.hasFailures()) {
+ log.debug(bulkResponse.buildFailureMessage());
+ }
} catch (IOException e) {
log.error(topic.getName(), e);
}
}
+
}
-
+
/**
*
* @param topic
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
index 135a2c09..d92d05ac 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ShutdownHookManager;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.dto.TopicConfig;
@@ -93,7 +94,7 @@ public class HdfsService {
lastFlush = System.currentTimeMillis();
}
} catch (IOException e) {
- log.error("error saving to HDFS." + topic, e);
+ log.error("{} error saving to HDFS. {}", topic, e.getMessage());
}
}
@@ -134,11 +135,12 @@ public class HdfsService {
out.writeUTF(message);
out.write('\n');
} catch (IOException e) {
- log.error("error writing to HDFS.", e);
+ log.error("error writing to HDFS. {}", e.getMessage());
}
});
out.close();
+ log.debug("Done writing {} to HDFS {}", bufferList.size(), filePath);
}
}
@@ -161,6 +163,10 @@ public class HdfsService {
fileSystem = FileSystem.get(hdfsConfig);
+ //disable Hadoop Shutdown Hook, we need the HDFS connection to flush data
+ ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get();
+ hadoopShutdownHookManager.clearShutdownHooks();
+
isReady = true;
} catch (Exception ex) {
log.error("error connection to HDFS.", ex);
@@ -170,20 +176,27 @@ public class HdfsService {
@PreDestroy
public void cleanUp() {
+ config.getShutdownLock().readLock().lock();
+
try {
+ log.info("fileSystem.close() at cleanUp.");
flush();
fileSystem.close();
} catch (IOException e) {
log.error("fileSystem.close() at cleanUp.", e);
+ } finally {
+ config.getShutdownLock().readLock().unlock();
}
}
public void flush() {
+ log.info("Force flush ALL data, regardless of stall");
bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic));
}
//if no new data comes in for a topic for a while, need to flush its buffer
public void flushStall() {
+ log.debug("Flush stall data");
bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
}
@@ -198,7 +211,7 @@ public class HdfsService {
if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
buffer.flush(topicStr);
} else {
- log.debug("buffer size too small to flush: bufferData.size() {} < config.getHdfsBatchSize() {}", buffer.getData().size(), config.getHdfsBatchSize());
+ log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
index 32d21c62..f3462e49 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
@@ -131,7 +131,14 @@ public class MongodbService {
@PreDestroy
public void cleanUp() {
- mongoClient.close();
+ config.getShutdownLock().readLock().lock();
+
+ try {
+ log.info("mongoClient.close() at cleanUp.");
+ mongoClient.close();
+ } finally {
+ config.getShutdownLock().readLock().unlock();
+ }
}
public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
index 7ed88797..84d5f337 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
@@ -52,7 +52,7 @@ public class PullService {
@Autowired
private TopicConfigPollingService topicConfigPollingService;
-
+
@Autowired
private ApplicationConfiguration config;
@@ -80,7 +80,7 @@ public class PullService {
for (int i = 0; i < numConsumers; i++) {
executorService.submit(puller);
}
-
+
topicConfigPollingThread = new Thread(topicConfigPollingService);
topicConfigPollingThread.setName("TopicConfigPolling");
topicConfigPollingThread.start();
@@ -98,22 +98,27 @@ public class PullService {
return;
}
- logger.info("stop pulling ...");
- puller.shutdown();
+ config.getShutdownLock().writeLock().lock();
+ try {
+ logger.info("stop pulling ...");
+ puller.shutdown();
- logger.info("stop TopicConfigPollingService ...");
- topicConfigPollingService.shutdown();
+ logger.info("stop TopicConfigPollingService ...");
+ topicConfigPollingService.shutdown();
- try {
topicConfigPollingThread.join();
-
+
executorService.shutdown();
executorService.awaitTermination(120L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- logger.error("executor.awaitTermination", e);
+ logger.error("shutdown(): executor.awaitTermination", e);
Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ logger.error("shutdown error.", e);
+ } finally {
+ config.getShutdownLock().writeLock().unlock();
}
-
+
isRunning = false;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
index 58b27834..21e1a08f 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
@@ -37,7 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
- * Service to check topic changes in Kafka and topic setting updates
+ * Service to check topic changes in Kafka and topic setting updates in DB
*
* @author Guobiao Mo
*
@@ -74,7 +74,7 @@ public class TopicConfigPollingService implements Runnable {
}
}
- public boolean isActiveTopicsChanged(boolean update) {
+ public boolean isActiveTopicsChanged(boolean update) {//update=true means sync local version
boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get();
log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get());
if (changed && update) {
@@ -96,10 +96,13 @@ public class TopicConfigPollingService implements Runnable {
public void run() {
active = true;
log.info("TopicConfigPollingService started.");
-
+
while (active) {
try { //sleep first since we already pool in init()
Thread.sleep(config.getDmaapCheckNewTopicInterval());
+ if(!active) {
+ break;
+ }
} catch (InterruptedException e) {
log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e);
Thread.currentThread().interrupt();
@@ -131,7 +134,11 @@ public class TopicConfigPollingService implements Runnable {
private List<String> poll() throws IOException {
log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
List<TopicConfig> activeTopicConfigs = dmaapService.getActiveTopicConfigs();
- activeTopicConfigs.stream().forEach(topicConfig -> effectiveTopicConfigMap.put(topicConfig.getName(), topicConfig));
+ Map<String, TopicConfig> tempEffectiveTopicConfigMap = new HashMap<>();
+
+ activeTopicConfigs.stream().forEach(topicConfig -> tempEffectiveTopicConfigMap.put(topicConfig.getName(), topicConfig));
+ effectiveTopicConfigMap = tempEffectiveTopicConfigMap;
+ log.debug("poll(), effectiveTopicConfigMap={}", effectiveTopicConfigMap);
List<String> ret = new ArrayList<>(activeTopicConfigs.size());
activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName()));
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
index db4dcfae..5c77d895 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
@@ -22,7 +22,6 @@ package org.onap.datalake.feeder.util;
import java.util.HashMap;
-import org.apache.commons.collections.CollectionUtils;
import org.json.JSONArray;
import org.json.JSONObject;