summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java39
1 files changed, 27 insertions, 12 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index cbcc5f86..1f637e1a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -31,6 +31,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
@@ -40,6 +41,7 @@ import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,14 +62,18 @@ public class ElasticsearchService {
@Autowired
private ApplicationConfiguration config;
+
+ @Autowired
+ private DbService dbService;
private RestHighLevelClient client;
ActionListener<BulkResponse> listener;
@PostConstruct
private void init() {
- String elasticsearchHost = config.getElasticsearchHost();
-
+ Db elasticsearch = dbService.getElasticsearch();
+ String elasticsearchHost = elasticsearch.getHost();
+
// Initialize the Connection
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
@@ -93,14 +99,16 @@ public class ElasticsearchService {
public void ensureTableExist(String topic) throws IOException {
String topicLower = topic.toLowerCase();
-
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
- try {
+
+ GetIndexRequest request = new GetIndexRequest();
+ request.indices(topicLower);
+
+ boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
+ if(!exists){
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
- }catch(ElasticsearchStatusException e) {
- log.info("{} create ES topic status: {}", topic, e.getDetailedMessage());
- }
+ }
}
//TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
@@ -114,7 +122,10 @@ public class ElasticsearchService {
continue;
}
}
- request.add(new IndexRequest(topic.getId().toLowerCase(), "doc").source(json.toString(), XContentType.JSON));
+
+ String id = topic.getMessageId(json); //id can be null
+
+ request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON));
}
if(config.isAsync()) {
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
@@ -122,19 +133,23 @@ public class ElasticsearchService {
try {
client.bulk(request, RequestOptions.DEFAULT);
} catch (IOException e) {
- log.error( topic.getId() , e);
+ log.error( topic.getName() , e);
}
}
}
private boolean correlateClearedMessage(JSONObject json) {
- boolean found = false;
-
+ boolean found = true;
+
/*TODO
* 1. check if this is a alarm cleared message
* 2. search previous alarm message
* 3. update previous message, if success, set found=true
*/
+ //for Sonar test, remove the following
+ if(json.isNull("kkkkk")) {
+ found = false;
+ }
return found;
}