diff options
author | Yan Yang <yangyanyj@chinamobile.com> | 2019-05-03 03:26:24 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-05-03 03:26:24 +0000 |
commit | a5336234db7c92232371b3bcc28817baaf4144f0 (patch) | |
tree | 6a56dffb31f60f2ab3220785c458f89294964de4 /components/datalake-handler/feeder | |
parent | 3d1fcfc96a550893657ec4df15a1ce852e7feda4 (diff) | |
parent | 7978e16b4f73845ae2c897d27074227f22af7acc (diff) |
Merge "Handle duplicate id"
Diffstat (limited to 'components/datalake-handler/feeder')
6 files changed, 116 insertions, 103 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index c618f57f..e957c0f4 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -107,10 +107,6 @@ public class Topic { this.name = name; } - public boolean isDefault() { - return "_DL_DEFAULT_".equals(name); - } - public boolean isEnabled() { return is(enabled); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java index 12d03ee6..f5ee5b79 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java @@ -106,7 +106,11 @@ public class CouchbaseService { JsonDocument doc = JsonDocument.create(id, expiry, jsonObject); documents.add(doc); } - saveDocuments(documents); + try { + saveDocuments(documents); + }catch(Exception e) { + log.error("error saving to Couchbase.", e); + } log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java index 4090e7eb..c354f175 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -35,7 +35,7 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; -import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; @@ -67,21 +67,20 @@ public class ElasticsearchService { @Autowired private ApplicationConfiguration config; - + @Autowired private DbService dbService; private RestHighLevelClient client; ActionListener<BulkResponse> listener; - -//ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication -//Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html + //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication + //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html @PostConstruct private void init() { Db elasticsearch = dbService.getElasticsearch(); String elasticsearchHost = elasticsearch.getHost(); - + // Initialize the Connection client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http"))); @@ -104,47 +103,47 @@ public class ElasticsearchService { public void cleanUp() throws IOException { client.close(); } - + public void ensureTableExist(String topic) throws IOException { String topicLower = topic.toLowerCase(); - - GetIndexRequest request = new GetIndexRequest(topicLower); - + + GetIndexRequest request = new GetIndexRequest(topicLower); + boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); - if(!exists){ + if (!exists) { //TODO submit mapping template - CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); - CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); + CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged()); } } - + //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { BulkRequest request = new BulkRequest(); for (JSONObject json : jsons) { - if(topic.isCorrelateClearedMessage()) { + if (topic.isCorrelateClearedMessage()) { boolean found = correlateClearedMessage(topic, json); - if(found) { + if (found) { continue; } } - + String id = topic.getMessageId(json); //id can be null request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON)); } log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size()); - - if(config.isAsync()) { - client.bulkAsync(request, RequestOptions.DEFAULT, listener); - }else { + + if (config.isAsync()) { + client.bulkAsync(request, RequestOptions.DEFAULT, listener); + } else { try { client.bulk(request, RequestOptions.DEFAULT); - } catch (IOException e) { - log.error( topic.getName() , e); + } catch (IOException e) { + log.error(topic.getName(), e); } } } @@ -155,9 +154,10 @@ public class ElasticsearchService { * @param json * @return boolean * - * Because of query by id, The search API cannot be used for query. - * The search API can only query all data or based on the fields in the source. - * So use the get API, three parameters: index, type, document id + * Because of query by id, The search API cannot be used for query. The + * search API can only query all data or based on the fields in the + * source. So use the get API, three parameters: index, type, document + * id */ private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) { boolean found = false; @@ -166,75 +166,67 @@ public class ElasticsearchService { try { eName = json.query("/event/commonEventHeader/eventName").toString(); - if (StringUtils.isNotBlank(eName)) { - - if (eName.endsWith("Cleared")) { - - String name = eName.substring(0, eName.length() - 7); - String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString(); - String specificProblem = json.query("/event/faultFields/specificProblem").toString(); - - String id = null; - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem); - - id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb" - String index = topic.getName().toLowerCase(); - - //get - GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id); - - GetResponse getResponse = null; - try { - getResponse = client.get(getRequest, RequestOptions.DEFAULT); - if (getResponse != null) { - - if (getResponse.isExists()) { - String sourceAsString = getResponse.getSourceAsString(); - JSONObject jsonObject = new JSONObject(sourceAsString); - jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed"); - String jsonString = jsonObject.toString(); - - //update - IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id); - request.source(jsonString, XContentType.JSON); - IndexResponse indexResponse = null; - try { - indexResponse = client.index(request, RequestOptions.DEFAULT); - found = true; - } catch (IOException e) { - log.error("save failure"); - } - } else { - log.error("The getResponse was not exists" ); + if (StringUtils.isNotBlank(eName) && eName.endsWith("Cleared")) { + + String name = eName.substring(0, eName.length() - 7); + String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString(); + String specificProblem = json.query("/event/faultFields/specificProblem").toString(); + + String id = null; + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem); + + id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb" + String index = topic.getName().toLowerCase(); + + //get + GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id); + + GetResponse getResponse = null; + try { + getResponse = client.get(getRequest, RequestOptions.DEFAULT); + if (getResponse != null) { + + if (getResponse.isExists()) { + String sourceAsString = getResponse.getSourceAsString(); + JSONObject jsonObject = new JSONObject(sourceAsString); + jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed"); + String jsonString = jsonObject.toString(); + + //update + IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id); + request.source(jsonString, XContentType.JSON); + IndexResponse indexResponse = null; + try { + indexResponse = client.index(request, RequestOptions.DEFAULT); + found = true; + } catch (IOException e) { + log.error("save failure"); } - } else { - log.error("The document for this id was not found" ); + log.error("The getResponse was not exists"); } - } catch (ElasticsearchException e) { - if (e.status() == RestStatus.NOT_FOUND) { - log.error("The document for this id was not found" ); - } - if (e.status() == RestStatus.CONFLICT) { - log.error("Version conflict" ); - } - log.error("Get document exception", e); - }catch (IOException e) { - log.error(topic.getName() , e); + } else { + log.error("The document for this id was not found"); } - } else { - log.info("The data is normal"); + } catch (ElasticsearchException e) { + if (e.status() == RestStatus.NOT_FOUND) { + log.error("The document for this id was not found"); + } + if (e.status() == RestStatus.CONFLICT) { + log.error("Version conflict"); + } + log.error("Get document exception", e); + } catch (IOException e) { + log.error(topic.getName(), e); } - } else { - log.debug("event id null"); } } catch (Exception e) { - log.error("error",e); + log.error("error", e); } return found; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java index 02c80a45..c5408951 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java @@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import com.mongodb.bulk.BulkWriteError; +import com.mongodb.MongoBulkWriteException; import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; import com.mongodb.MongoClientOptions.Builder; @@ -48,6 +50,7 @@ import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.InsertManyOptions; /** * Service for using MongoDB @@ -62,7 +65,7 @@ public class MongodbService { @Autowired private ApplicationConfiguration config; - private boolean dbReady = false; + private boolean dbReady = false; @Autowired private DbService dbService; @@ -70,6 +73,7 @@ public class MongodbService { private MongoDatabase database; private MongoClient mongoClient; private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>(); + private InsertManyOptions insertManyOptions; @PostConstruct private void init() { @@ -103,26 +107,26 @@ public class MongodbService { addrs.add(new ServerAddress(host, port)); // FIXME should be a list of address - try { - if(StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password)) - { + if (StringUtils.isNoneBlank(userName) && StringUtils.isNoneBlank(password)) { credential = MongoCredential.createCredential(userName, databaseName, password.toCharArray()); List<MongoCredential> credentialList = new ArrayList<MongoCredential>(); credentialList.add(credential); mongoClient = new MongoClient(addrs, credentialList, options); - }else - { + } else { mongoClient = new MongoClient(addrs, options); } - }catch(Exception ex){ + } catch (Exception ex) { dbReady = false; log.error("Fail to initiate MongoDB" + mongodb.getHost()); return; } database = mongoClient.getDatabase(mongodb.getDatabase()); - dbReady = true; + insertManyOptions = new InsertManyOptions(); + insertManyOptions.ordered(false); + + dbReady = true; } @PreDestroy @@ -131,7 +135,7 @@ public class MongodbService { } public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { - if(dbReady == false) + if (dbReady == false) return; List<Document> documents = new ArrayList<>(jsons.size()); for (JSONObject json : jsons) { @@ -147,7 +151,15 @@ public class MongodbService { String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]", "");//remove - _ . MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k)); - collection.insertMany(documents); + + try { + collection.insertMany(documents, insertManyOptions); + } catch (MongoBulkWriteException e) { + List<BulkWriteError> bulkWriteErrors = e.getWriteErrors(); + for (BulkWriteError bulkWriteError : bulkWriteErrors) { + log.error("Failed record: {}", bulkWriteError); + } + } log.debug("saved text to topic = {}, batch count = {} ", topic, jsons.size()); } diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java index 74f0884f..4397e914 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java @@ -68,6 +68,7 @@ public class TopicTest { assertTrue(testTopic.equals(new Topic("test"))); assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode()); + assertEquals(testTopic.toString(), "test"); defaultTopic.setDbs(new HashSet<>()); defaultTopic.getDbs().add(new Db("Elasticsearch")); diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java index c65e920e..dc9feedc 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java @@ -28,6 +28,7 @@ import java.util.HashSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -79,13 +80,20 @@ public class TopicConfigTest { public void testIs() { Topic testTopic = new Topic("test"); - assertTrue(testTopic.equals(new Topic("test"))); - assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode()); - + TopicConfig testTopicConfig = testTopic.getTopicConfig(); + testTopicConfig.setSinkdbs(null); + assertFalse(testTopicConfig.supportElasticsearch()); + assertNull(testTopicConfig.getDataFormat2()); + testTopic.setDbs(new HashSet<>()); testTopic.getDbs().add(new Db("Elasticsearch")); - TopicConfig testTopicConfig = testTopic.getTopicConfig(); + testTopicConfig = testTopic.getTopicConfig(); + + assertEquals(testTopicConfig, new Topic("test").getTopicConfig()); + assertNotEquals(testTopicConfig, testTopic); + assertNotEquals(testTopicConfig, null); + assertEquals(testTopicConfig.hashCode(), (new Topic("test").getTopicConfig()).hashCode()); assertTrue(testTopicConfig.supportElasticsearch()); assertFalse(testTopicConfig.supportCouchbase()); |