summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-05-01 17:17:38 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-05-02 16:48:20 -0700
commit7978e16b4f73845ae2c897d27074227f22af7acc (patch)
tree99547561852bc3205664bfd22116a3e6720e296e /components/datalake-handler/feeder/src/main/java/org
parent2920426bd0f8369a178895138e97b0b19372c413 (diff)
Handle duplicate id
Issue-ID: DCAEGEN2-1411 Change-Id: Ib39b39ff276d7d5c3dbfaa281df6104926fa354d Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java6
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java160
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java32
4 files changed, 103 insertions, 99 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index c618f57f..e957c0f4 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -107,10 +107,6 @@ public class Topic {
this.name = name;
}
- public boolean isDefault() {
- return "_DL_DEFAULT_".equals(name);
- }
-
public boolean isEnabled() {
return is(enabled);
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
index 12d03ee6..f5ee5b79 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
@@ -106,7 +106,11 @@ public class CouchbaseService {
JsonDocument doc = JsonDocument.create(id, expiry, jsonObject);
documents.add(doc);
}
- saveDocuments(documents);
+ try {
+ saveDocuments(documents);
+ }catch(Exception e) {
+ log.error("error saving to Couchbase.", e);
+ }
log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size());
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index 4090e7eb..c354f175 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -35,7 +35,7 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
-import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
@@ -67,21 +67,20 @@ public class ElasticsearchService {
@Autowired
private ApplicationConfiguration config;
-
+
@Autowired
private DbService dbService;
private RestHighLevelClient client;
ActionListener<BulkResponse> listener;
-
-//ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
-//Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
+ //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
+ //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
@PostConstruct
private void init() {
Db elasticsearch = dbService.getElasticsearch();
String elasticsearchHost = elasticsearch.getHost();
-
+
// Initialize the Connection
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
@@ -104,47 +103,47 @@ public class ElasticsearchService {
public void cleanUp() throws IOException {
client.close();
}
-
+
public void ensureTableExist(String topic) throws IOException {
String topicLower = topic.toLowerCase();
-
- GetIndexRequest request = new GetIndexRequest(topicLower);
-
+
+ GetIndexRequest request = new GetIndexRequest(topicLower);
+
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
- if(!exists){
+ if (!exists) {
//TODO submit mapping template
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
- CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
+ CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged());
}
}
-
+
//TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
BulkRequest request = new BulkRequest();
for (JSONObject json : jsons) {
- if(topic.isCorrelateClearedMessage()) {
+ if (topic.isCorrelateClearedMessage()) {
boolean found = correlateClearedMessage(topic, json);
- if(found) {
+ if (found) {
continue;
}
}
-
+
String id = topic.getMessageId(json); //id can be null
request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
}
log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size());
-
- if(config.isAsync()) {
- client.bulkAsync(request, RequestOptions.DEFAULT, listener);
- }else {
+
+ if (config.isAsync()) {
+ client.bulkAsync(request, RequestOptions.DEFAULT, listener);
+ } else {
try {
client.bulk(request, RequestOptions.DEFAULT);
- } catch (IOException e) {
- log.error( topic.getName() , e);
+ } catch (IOException e) {
+ log.error(topic.getName(), e);
}
}
}
@@ -155,9 +154,10 @@ public class ElasticsearchService {
* @param json
* @return boolean
*
- * Because of query by id, The search API cannot be used for query.
- * The search API can only query all data or based on the fields in the source.
- * So use the get API, three parameters: index, type, document id
+ * Because of query by id, The search API cannot be used for query. The
+ * search API can only query all data or based on the fields in the
+ * source. So use the get API, three parameters: index, type, document
+ * id
*/
private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) {
boolean found = false;
@@ -166,75 +166,67 @@ public class ElasticsearchService {
try {
eName = json.query("/event/commonEventHeader/eventName").toString();
- if (StringUtils.isNotBlank(eName)) {
-
- if (eName.endsWith("Cleared")) {
-
- String name = eName.substring(0, eName.length() - 7);
- String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
- String specificProblem = json.query("/event/faultFields/specificProblem").toString();
-
- String id = null;
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem);
-
- id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb"
- String index = topic.getName().toLowerCase();
-
- //get
- GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id);
-
- GetResponse getResponse = null;
- try {
- getResponse = client.get(getRequest, RequestOptions.DEFAULT);
- if (getResponse != null) {
-
- if (getResponse.isExists()) {
- String sourceAsString = getResponse.getSourceAsString();
- JSONObject jsonObject = new JSONObject(sourceAsString);
- jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed");
- String jsonString = jsonObject.toString();
-
- //update
- IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id);
- request.source(jsonString, XContentType.JSON);
- IndexResponse indexResponse = null;
- try {
- indexResponse = client.index(request, RequestOptions.DEFAULT);
- found = true;
- } catch (IOException e) {
- log.error("save failure");
- }
- } else {
- log.error("The getResponse was not exists" );
+ if (StringUtils.isNotBlank(eName) && eName.endsWith("Cleared")) {
+
+ String name = eName.substring(0, eName.length() - 7);
+ String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
+ String specificProblem = json.query("/event/faultFields/specificProblem").toString();
+
+ String id = null;
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem);
+
+ id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb"
+ String index = topic.getName().toLowerCase();
+
+ //get
+ GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id);
+
+ GetResponse getResponse = null;
+ try {
+ getResponse = client.get(getRequest, RequestOptions.DEFAULT);
+ if (getResponse != null) {
+
+ if (getResponse.isExists()) {
+ String sourceAsString = getResponse.getSourceAsString();
+ JSONObject jsonObject = new JSONObject(sourceAsString);
+ jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed");
+ String jsonString = jsonObject.toString();
+
+ //update
+ IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id);
+ request.source(jsonString, XContentType.JSON);
+ IndexResponse indexResponse = null;
+ try {
+ indexResponse = client.index(request, RequestOptions.DEFAULT);
+ found = true;
+ } catch (IOException e) {
+ log.error("save failure");
}
-
} else {
- log.error("The document for this id was not found" );
+ log.error("The getResponse was not exists");
}
- } catch (ElasticsearchException e) {
- if (e.status() == RestStatus.NOT_FOUND) {
- log.error("The document for this id was not found" );
- }
- if (e.status() == RestStatus.CONFLICT) {
- log.error("Version conflict" );
- }
- log.error("Get document exception", e);
- }catch (IOException e) {
- log.error(topic.getName() , e);
+ } else {
+ log.error("The document for this id was not found");
}
- } else {
- log.info("The data is normal");
+ } catch (ElasticsearchException e) {
+ if (e.status() == RestStatus.NOT_FOUND) {
+ log.error("The document for this id was not found");
+ }
+ if (e.status() == RestStatus.CONFLICT) {
+ log.error("Version conflict");
+ }
+ log.error("Get document exception", e);
+ } catch (IOException e) {
+ log.error(topic.getName(), e);
}
- } else {
- log.debug("event id null");
}
} catch (Exception e) {
- log.error("error",e);
+ log.error("error", e);
}
return found;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
index 02c80a45..c5408951 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
@@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import com.mongodb.bulk.BulkWriteError;
+import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientOptions.Builder;
@@ -48,6 +50,7 @@ import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertManyOptions;
/**
* Service for using MongoDB
@@ -62,7 +65,7 @@ public class MongodbService {
@Autowired
private ApplicationConfiguration config;
- private boolean dbReady = false;
+ private boolean dbReady = false;
@Autowired
private DbService dbService;
@@ -70,6 +73,7 @@ public class MongodbService {
private MongoDatabase database;
private MongoClient mongoClient;
private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
+ private InsertManyOptions insertManyOptions;
@PostConstruct
private void init() {
@@ -103,26 +107,26 @@ public class MongodbService {
addrs.add(new ServerAddress(host, port)); // FIXME should be a list of address
-
try {
- if(StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password))
- {
+ if (StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password)) {
credential = MongoCredential.createCredential(userName, databaseName, password.toCharArray());
List<MongoCredential> credentialList = new ArrayList<MongoCredential>();
credentialList.add(credential);
mongoClient = new MongoClient(addrs, credentialList, options);
- }else
- {
+ } else {
mongoClient = new MongoClient(addrs, options);
}
- }catch(Exception ex){
+ } catch (Exception ex) {
dbReady = false;
log.error("Fail to initiate MongoDB" + mongodb.getHost());
return;
}
database = mongoClient.getDatabase(mongodb.getDatabase());
- dbReady = true;
+ insertManyOptions = new InsertManyOptions();
+ insertManyOptions.ordered(false);
+
+ dbReady = true;
}
@PreDestroy
@@ -131,7 +135,7 @@ public class MongodbService {
}
public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
- if(dbReady == false)
+ if (dbReady == false)
return;
List<Document> documents = new ArrayList<>(jsons.size());
for (JSONObject json : jsons) {
@@ -147,7 +151,15 @@ public class MongodbService {
String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ .
MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k));
- collection.insertMany(documents);
+
+ try {
+ collection.insertMany(documents, insertManyOptions);
+ } catch (MongoBulkWriteException e) {
+ List<BulkWriteError> bulkWriteErrors = e.getWriteErrors();
+ for (BulkWriteError bulkWriteError : bulkWriteErrors) {
+ log.error("Failed record: {}", bulkWriteError);
+ }
+ }
log.debug("saved text to topic = {}, batch count = {} ", topic, jsons.size());
}