diff options
author | ZhangZihao <zhangzihao@chinamobile.com> | 2019-04-24 16:59:34 +0800 |
---|---|---|
committer | Zihao Zhang <zhangzihao@chinamobile.com> | 2019-04-25 08:57:02 +0000 |
commit | f18883b4aa42334c39b523ead45ecb18ed2f5e92 (patch) | |
tree | 6d22ba19448e2404ffc02a47284843b4b660ec39 /components/datalake-handler/feeder | |
parent | c82dafde6b3f02cccba0822a340cb9d7bed35a1c (diff) |
‘correlateClearedMessage’
Change-Id: I32ea4043d32f29c920b370de3c84341c218ed5c2
Issue-ID: DCAEGEN2-1190
Signed-off-by: ZhangZihao <zhangzihao@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder')
4 files changed, 102 insertions, 17 deletions
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql index d674ead1..44f4ef17 100644 --- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql +++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql @@ -52,6 +52,7 @@ insert into db (`name`,`host`) values ('Druid','dl_druid'); -- in production, default enabled should be off
insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');
insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);
+insert into `topic`(`name`,correlate_cleared_message,`enabled`,default_topic, message_id_path) values ('unauthenticated.SEC_FAULT_OUTPUT',1,0,'_DL_DEFAULT_', '/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFAULT_');
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index 21ca1b70..a3add0ed 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -57,4 +57,6 @@ public class ApplicationConfiguration { private String rawDataLabel; private String defaultTopicName; + + private String elasticsearchType; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java index 2c16b2b8..30aa7332 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -26,8 +26,13 @@ import java.util.List; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.GetIndexRequest; @@ -38,6 +43,7 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.RestStatus; import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; @@ -118,15 +124,15 @@ public class ElasticsearchService { for (JSONObject json : jsons) { if(topic.isCorrelateClearedMessage()) { - boolean found = correlateClearedMessage(json); + boolean found = correlateClearedMessage(topic, json); if(found) { continue; } } String id = topic.getMessageId(json); //id can be null - - request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON)); + + request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON)); } if(config.isAsync()) { client.bulkAsync(request, RequestOptions.DEFAULT, listener); @@ -138,21 +144,96 @@ public class ElasticsearchService { } } } - - private boolean correlateClearedMessage(JSONObject json) { - boolean found = true; - - /*TODO - * 1. check if this is a alarm cleared message - * 2. search previous alarm message - * 3. update previous message, if success, set found=true - */ - //for Sonar test, remove the following - if(json.isNull("kkkkk")) { - found = false; + + /** + * + * @param topic + * @param json + * @return boolean + * + * Because of query by id, The search API cannot be used for query. + * The search API can only query all data or based on the fields in the source. + * So use the get API, three parameters: index, type, document id + */ + private boolean correlateClearedMessage(Topic topic, JSONObject json) { + boolean found = false; + String eName = null; + + try { + eName = json.query("/event/commonEventHeader/eventName").toString(); + + if (StringUtils.isNotBlank(eName)) { + + if (eName.endsWith("Cleared")) { + + String name = eName.substring(0, eName.length() - 7); + String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString(); + String specificProblem = json.query("/event/faultFields/specificProblem").toString(); + + String id = null; + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem); + + id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb" + String index = topic.getName().toLowerCase(); + + //get + GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id); + + GetResponse getResponse = null; + try { + getResponse = client.get(getRequest, RequestOptions.DEFAULT); + if (getResponse != null) { + + if (getResponse.isExists()) { + String sourceAsString = getResponse.getSourceAsString(); + JSONObject jsonObject = new JSONObject(sourceAsString); + jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed"); + String jsonString = jsonObject.toString(); + + //update + IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id); + request.source(jsonString, XContentType.JSON); + IndexResponse indexResponse = null; + try { + indexResponse = client.index(request, RequestOptions.DEFAULT); + found = true; + } catch (IOException e) { + log.error("save failure"); + } + } else { + log.error("The getResponse was not exists" ); + } + + } else { + log.error("The document for this id was not found" ); + } + + } catch (ElasticsearchException e) { + if (e.status() == RestStatus.NOT_FOUND) { + log.error("The document for this id was not found" ); + } + if (e.status() == RestStatus.CONFLICT) { + log.error("Version conflict" ); + } + log.error("Get document exception", e); + }catch (IOException e) { + log.error(topic.getName() , e); + } + + } else { + log.info("The data is normal"); + } + + } else { + log.debug("event id null"); + } + + } catch (Exception e) { + log.error("error",e); } - - return found; + + return found; } } diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties index 1bd53392..dfe48a29 100644 --- a/components/datalake-handler/feeder/src/main/resources/application.properties +++ b/components/datalake-handler/feeder/src/main/resources/application.properties @@ -44,6 +44,7 @@ rawDataLabel=datalake_text_ defaultTopicName=_DL_DEFAULT_ +elasticsearchType=doc #Logging logging.level.org.springframework.web=ERROR |