From 7978e16b4f73845ae2c897d27074227f22af7acc Mon Sep 17 00:00:00 2001 From: Guobiao Mo Date: Wed, 1 May 2019 17:17:38 -0700 Subject: Handle duplicate id Issue-ID: DCAEGEN2-1411 Change-Id: Ib39b39ff276d7d5c3dbfaa281df6104926fa354d Signed-off-by: Guobiao Mo --- .../org/onap/datalake/feeder/domain/Topic.java | 4 - .../datalake/feeder/service/CouchbaseService.java | 6 +- .../feeder/service/ElasticsearchService.java | 160 ++++++++++----------- .../datalake/feeder/service/MongodbService.java | 32 +++-- 4 files changed, 103 insertions(+), 99 deletions(-) (limited to 'components/datalake-handler/feeder/src/main/java/org') diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index c618f57f..e957c0f4 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -107,10 +107,6 @@ public class Topic { this.name = name; } - public boolean isDefault() { - return "_DL_DEFAULT_".equals(name); - } - public boolean isEnabled() { return is(enabled); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java index 12d03ee6..f5ee5b79 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java @@ -106,7 +106,11 @@ public class CouchbaseService { JsonDocument doc = JsonDocument.create(id, expiry, jsonObject); documents.add(doc); } - saveDocuments(documents); + try { + saveDocuments(documents); + }catch(Exception e) { + log.error("error saving to Couchbase.", e); + } log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java index 4090e7eb..c354f175 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -35,7 +35,7 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; -import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; @@ -67,21 +67,20 @@ public class ElasticsearchService { @Autowired private ApplicationConfiguration config; - + @Autowired private DbService dbService; private RestHighLevelClient client; ActionListener listener; - -//ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication -//Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html + //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication + //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html @PostConstruct private void init() { Db elasticsearch = dbService.getElasticsearch(); String elasticsearchHost = elasticsearch.getHost(); - + // Initialize the Connection client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http"))); @@ -104,47 +103,47 @@ public class ElasticsearchService { public void cleanUp() throws IOException { client.close(); } - + public void ensureTableExist(String topic) throws IOException { String topicLower = topic.toLowerCase(); - - GetIndexRequest request = new GetIndexRequest(topicLower); - + + GetIndexRequest request = new GetIndexRequest(topicLower); + boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); - if(!exists){ + if (!exists) { //TODO submit mapping template - CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); - CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); + CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged()); } } - + //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME public void saveJsons(TopicConfig topic, List jsons) { BulkRequest request = new BulkRequest(); for (JSONObject json : jsons) { - if(topic.isCorrelateClearedMessage()) { + if (topic.isCorrelateClearedMessage()) { boolean found = correlateClearedMessage(topic, json); - if(found) { + if (found) { continue; } } - + String id = topic.getMessageId(json); //id can be null request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON)); } log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size()); - - if(config.isAsync()) { - client.bulkAsync(request, RequestOptions.DEFAULT, listener); - }else { + + if (config.isAsync()) { + client.bulkAsync(request, RequestOptions.DEFAULT, listener); + } else { try { client.bulk(request, RequestOptions.DEFAULT); - } catch (IOException e) { - log.error( topic.getName() , e); + } catch (IOException e) { + log.error(topic.getName(), e); } } } @@ -155,9 +154,10 @@ public class ElasticsearchService { * @param json * @return boolean * - * Because of query by id, The search API cannot be used for query. - * The search API can only query all data or based on the fields in the source. - * So use the get API, three parameters: index, type, document id + * Because of query by id, The search API cannot be used for query. The + * search API can only query all data or based on the fields in the + * source. So use the get API, three parameters: index, type, document + * id */ private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) { boolean found = false; @@ -166,75 +166,67 @@ public class ElasticsearchService { try { eName = json.query("/event/commonEventHeader/eventName").toString(); - if (StringUtils.isNotBlank(eName)) { - - if (eName.endsWith("Cleared")) { - - String name = eName.substring(0, eName.length() - 7); - String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString(); - String specificProblem = json.query("/event/faultFields/specificProblem").toString(); - - String id = null; - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem); - - id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb" - String index = topic.getName().toLowerCase(); - - //get - GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id); - - GetResponse getResponse = null; - try { - getResponse = client.get(getRequest, RequestOptions.DEFAULT); - if (getResponse != null) { - - if (getResponse.isExists()) { - String sourceAsString = getResponse.getSourceAsString(); - JSONObject jsonObject = new JSONObject(sourceAsString); - jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed"); - String jsonString = jsonObject.toString(); - - //update - IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id); - request.source(jsonString, XContentType.JSON); - IndexResponse indexResponse = null; - try { - indexResponse = client.index(request, RequestOptions.DEFAULT); - found = true; - } catch (IOException e) { - log.error("save failure"); - } - } else { - log.error("The getResponse was not exists" ); + if (StringUtils.isNotBlank(eName) && eName.endsWith("Cleared")) { + + String name = eName.substring(0, eName.length() - 7); + String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString(); + String specificProblem = json.query("/event/faultFields/specificProblem").toString(); + + String id = null; + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem); + + id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb" + String index = topic.getName().toLowerCase(); + + //get + GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id); + + GetResponse getResponse = null; + try { + getResponse = client.get(getRequest, RequestOptions.DEFAULT); + if (getResponse != null) { + + if (getResponse.isExists()) { + String sourceAsString = getResponse.getSourceAsString(); + JSONObject jsonObject = new JSONObject(sourceAsString); + jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed"); + String jsonString = jsonObject.toString(); + + //update + IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id); + request.source(jsonString, XContentType.JSON); + IndexResponse indexResponse = null; + try { + indexResponse = client.index(request, RequestOptions.DEFAULT); + found = true; + } catch (IOException e) { + log.error("save failure"); } - } else { - log.error("The document for this id was not found" ); + log.error("The getResponse was not exists"); } - } catch (ElasticsearchException e) { - if (e.status() == RestStatus.NOT_FOUND) { - log.error("The document for this id was not found" ); - } - if (e.status() == RestStatus.CONFLICT) { - log.error("Version conflict" ); - } - log.error("Get document exception", e); - }catch (IOException e) { - log.error(topic.getName() , e); + } else { + log.error("The document for this id was not found"); } - } else { - log.info("The data is normal"); + } catch (ElasticsearchException e) { + if (e.status() == RestStatus.NOT_FOUND) { + log.error("The document for this id was not found"); + } + if (e.status() == RestStatus.CONFLICT) { + log.error("Version conflict"); + } + log.error("Get document exception", e); + } catch (IOException e) { + log.error(topic.getName(), e); } - } else { - log.debug("event id null"); } } catch (Exception e) { - log.error("error",e); + log.error("error", e); } return found; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java index 02c80a45..c5408951 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java @@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import com.mongodb.bulk.BulkWriteError; +import com.mongodb.MongoBulkWriteException; import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; import com.mongodb.MongoClientOptions.Builder; @@ -48,6 +50,7 @@ import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.InsertManyOptions; /** * Service for using MongoDB @@ -62,7 +65,7 @@ public class MongodbService { @Autowired private ApplicationConfiguration config; - private boolean dbReady = false; + private boolean dbReady = false; @Autowired private DbService dbService; @@ -70,6 +73,7 @@ public class MongodbService { private MongoDatabase database; private MongoClient mongoClient; private Map> mongoCollectionMap = new HashMap<>(); + private InsertManyOptions insertManyOptions; @PostConstruct private void init() { @@ -103,26 +107,26 @@ public class MongodbService { addrs.add(new ServerAddress(host, port)); // FIXME should be a list of address - try { - if(StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password)) - { + if (StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password)) { credential = MongoCredential.createCredential(userName, databaseName, password.toCharArray()); List credentialList = new ArrayList(); credentialList.add(credential); mongoClient = new MongoClient(addrs, credentialList, options); - }else - { + } else { mongoClient = new MongoClient(addrs, options); } - }catch(Exception ex){ + } catch (Exception ex) { dbReady = false; log.error("Fail to initiate MongoDB" + mongodb.getHost()); return; } database = mongoClient.getDatabase(mongodb.getDatabase()); - dbReady = true; + insertManyOptions = new InsertManyOptions(); + insertManyOptions.ordered(false); + + dbReady = true; } @PreDestroy @@ -131,7 +135,7 @@ public class MongodbService { } public void saveJsons(TopicConfig topic, List jsons) { - if(dbReady == false) + if (dbReady == false) return; List documents = new ArrayList<>(jsons.size()); for (JSONObject json : jsons) { @@ -147,7 +151,15 @@ public class MongodbService { String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ . MongoCollection collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k)); - collection.insertMany(documents); + + try { + collection.insertMany(documents, insertManyOptions); + } catch (MongoBulkWriteException e) { + List bulkWriteErrors = e.getWriteErrors(); + for (BulkWriteError bulkWriteError : bulkWriteErrors) { + log.error("Failed record: {}", bulkWriteError); + } + } log.debug("saved text to topic = {}, batch count = {} ", topic, jsons.size()); } -- cgit 1.2.3-korg