summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYan Yang <yangyanyj@chinamobile.com>2019-04-26 00:31:52 +0000
committerGerrit Code Review <gerrit@onap.org>2019-04-26 00:31:52 +0000
commit2cde61f144cf41457573a1abaf30c6ebe6674c17 (patch)
tree6d8269bc46b660acc6e87ae6d701d11364e1caaa
parent49de3c861e6baca6474d47cf45a042302468c9f7 (diff)
parentf18883b4aa42334c39b523ead45ecb18ed2f5e92 (diff)
Merge "‘correlateClearedMessage’"
-rw-r--r--components/datalake-handler/feeder/src/assembly/scripts/init_db.sql1
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java115
-rw-r--r--components/datalake-handler/feeder/src/main/resources/application.properties1
4 files changed, 102 insertions, 17 deletions
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
index d674ead1..44f4ef17 100644
--- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
+++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
@@ -52,6 +52,7 @@ insert into db (`name`,`host`) values ('Druid','dl_druid');
-- in production, default enabled should be off
insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');
insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);
+insert into `topic`(`name`,correlate_cleared_message,`enabled`,default_topic, message_id_path) values ('unauthenticated.SEC_FAULT_OUTPUT',1,0,'_DL_DEFAULT_', '/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFAULT_');
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 21ca1b70..a3add0ed 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -57,4 +57,6 @@ public class ApplicationConfiguration {
private String rawDataLabel;
private String defaultTopicName;
+
+ private String elasticsearchType;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index 2c16b2b8..30aa7332 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -26,8 +26,13 @@ import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
@@ -38,6 +43,7 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.rest.RestStatus;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
@@ -118,15 +124,15 @@ public class ElasticsearchService {
for (JSONObject json : jsons) {
if(topic.isCorrelateClearedMessage()) {
- boolean found = correlateClearedMessage(json);
+ boolean found = correlateClearedMessage(topic, json);
if(found) {
continue;
}
}
String id = topic.getMessageId(json); //id can be null
-
- request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON));
+
+ request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
}
if(config.isAsync()) {
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
@@ -138,21 +144,96 @@ public class ElasticsearchService {
}
}
}
-
- private boolean correlateClearedMessage(JSONObject json) {
- boolean found = true;
-
- /*TODO
- * 1. check if this is a alarm cleared message
- * 2. search previous alarm message
- * 3. update previous message, if success, set found=true
- */
- //for Sonar test, remove the following
- if(json.isNull("kkkkk")) {
- found = false;
+
+ /**
+ *
+ * @param topic
+ * @param json
+ * @return boolean
+ *
+ * Because of query by id, The search API cannot be used for query.
+ * The search API can only query all data or based on the fields in the source.
+ * So use the get API, three parameters: index, type, document id
+ */
+ private boolean correlateClearedMessage(Topic topic, JSONObject json) {
+ boolean found = false;
+ String eName = null;
+
+ try {
+ eName = json.query("/event/commonEventHeader/eventName").toString();
+
+ if (StringUtils.isNotBlank(eName)) {
+
+ if (eName.endsWith("Cleared")) {
+
+ String name = eName.substring(0, eName.length() - 7);
+ String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
+ String specificProblem = json.query("/event/faultFields/specificProblem").toString();
+
+ String id = null;
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem);
+
+ id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb"
+ String index = topic.getName().toLowerCase();
+
+ //get
+ GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id);
+
+ GetResponse getResponse = null;
+ try {
+ getResponse = client.get(getRequest, RequestOptions.DEFAULT);
+ if (getResponse != null) {
+
+ if (getResponse.isExists()) {
+ String sourceAsString = getResponse.getSourceAsString();
+ JSONObject jsonObject = new JSONObject(sourceAsString);
+ jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed");
+ String jsonString = jsonObject.toString();
+
+ //update
+ IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id);
+ request.source(jsonString, XContentType.JSON);
+ IndexResponse indexResponse = null;
+ try {
+ indexResponse = client.index(request, RequestOptions.DEFAULT);
+ found = true;
+ } catch (IOException e) {
+ log.error("save failure");
+ }
+ } else {
+ log.error("The getResponse was not exists" );
+ }
+
+ } else {
+ log.error("The document for this id was not found" );
+ }
+
+ } catch (ElasticsearchException e) {
+ if (e.status() == RestStatus.NOT_FOUND) {
+ log.error("The document for this id was not found" );
+ }
+ if (e.status() == RestStatus.CONFLICT) {
+ log.error("Version conflict" );
+ }
+ log.error("Get document exception", e);
+ }catch (IOException e) {
+ log.error(topic.getName() , e);
+ }
+
+ } else {
+ log.info("The data is normal");
+ }
+
+ } else {
+ log.debug("event id null");
+ }
+
+ } catch (Exception e) {
+ log.error("error",e);
}
-
- return found;
+
+ return found;
}
}
diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties
index 1bd53392..dfe48a29 100644
--- a/components/datalake-handler/feeder/src/main/resources/application.properties
+++ b/components/datalake-handler/feeder/src/main/resources/application.properties
@@ -44,6 +44,7 @@ rawDataLabel=datalake_text_
defaultTopicName=_DL_DEFAULT_
+elasticsearchType=doc
#Logging
logging.level.org.springframework.web=ERROR